Flink on Kubernetes

Reading Time: 3 minutes

Introduction

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. The design of Flink is such as to run in all common cluster environments, perform computations at in-memory speed and at any scale.

There are two Flink’s clusters: Flink session cluster and Flink job cluster. A job cluster is a dedicated cluster that runs a single job. The job is part of the image and, thus, there is no need for extra job submission. A session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

Flink Session Cluster

A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

A basic session cluster deployment in Kubernetes has three components:

  • a Deployment/Job which runs the JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Deploying on Kubernetes

We will be deploying this application on Kubernetes using Deployments (for both jobmanager and taskmanager) and Services(to expose ports and UI).

For those using Minkube, executing this command is necessary before deploying our resources:

minikube ssh 'sudo ip link set docker0 promisc on'

Without executing the above command, the components are not able to self reference themselves through a Kubernetes service.

Now, let us look at our YAML files to deploy our application:

jobmanager-deployment.yaml

We are using the flink:latest image and have set our resources limit at 512Mb memory and 0.5 CPU cores.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        resources: 
         limits: 
          memory: "512Mi"
          cpu: "500m"
        args: 
        - jobmanager
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8081
          name: ui
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager

taskmanager-deployment.yaml

We are again using the flink:latest image and have set our resources limit at 512Mb memory and 0.5 CPU cores.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        resources: 
         limits: 
          memory: "512Mi"
          cpu: "500m"
        args:
        - taskmanger
        - "-Dtaskmanager.host=$(K8S_POD_IP)"
        ports:
        - containerPort: 6121
          name: data
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        - name: K8S_POD_IP
          valueFrom:
           fieldRef:
            fieldPath: status.podIP

jobmanager-service.yaml

We will be creating a Service object to expose necessary ports.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: query
    port: 6125
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml

Rest Service to expose UI ports.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
  selector:
    app: flink
    component: jobmanager

Now let us deploy these Kubernetes Objects using kubectl command:

kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-rest-service.yaml

To get status of our Objects, we can use the following commands:

# To view deployments
kubectl get deploy

# To view services
kubectl get svc

# To view pods
kubectl get po

On going to localhost:8081 on our browser we will get a page similar to this:

But in our case, Available task Slots would be 2 since we have deployed two replicas of task manager.

Conclusion

In this blog, we looked at how to deploy a simple Apache Flink cluster on Kubernetes and how easy it is to do the same. Further, we can look how to make jobs run in our Flink cluster and how these jobs are executed.

References

  1. https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
  2. http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/