
Introduction
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. The design of Flink is such as to run in all common cluster environments, perform computations at in-memory speed and at any scale.
There are two Flink’s clusters: Flink session cluster and Flink job cluster. A job cluster is a dedicated cluster that runs a single job. The job is part of the image and, thus, there is no need for extra job submission. A session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
Flink Session Cluster
A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
A basic session cluster deployment in Kubernetes has three components:
- a Deployment/Job which runs the JobManager
- a Deployment for a pool of TaskManagers
- a Service exposing the JobManager’s REST and UI ports
Deploying on Kubernetes
We will be deploying this application on Kubernetes using Deployments (for both jobmanager and taskmanager) and Services(to expose ports and UI).
For those using Minkube, executing this command is necessary before deploying our resources:
minikube ssh 'sudo ip link set docker0 promisc on'
Without executing the above command, the components are not able to self reference themselves through a Kubernetes service.
Now, let us look at our YAML files to deploy our application:
jobmanager-deployment.yaml
We are using the flink:latest
image and have set our resources limit at 512Mb memory and 0.5 CPU cores.
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
resources:
limits:
memory: "512Mi"
cpu: "500m"
args:
- jobmanager
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
taskmanager-deployment.yaml
We are again using the flink:latest
image and have set our resources limit at 512Mb memory and 0.5 CPU cores.
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:latest
resources:
limits:
memory: "512Mi"
cpu: "500m"
args:
- taskmanger
- "-Dtaskmanager.host=$(K8S_POD_IP)"
ports:
- containerPort: 6121
name: data
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
jobmanager-service.yaml
We will be creating a Service object to expose necessary ports.
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
jobmanager-rest-service.yaml
Rest Service to expose UI ports.
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
selector:
app: flink
component: jobmanager
Now let us deploy these Kubernetes Objects using kubectl
command:
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-rest-service.yaml
To get status of our Objects, we can use the following commands:
# To view deployments
kubectl get deploy
# To view services
kubectl get svc
# To view pods
kubectl get po
On going to localhost:8081 on our browser we will get a page similar to this:

But in our case, Available task Slots would be 2 since we have deployed two replicas of task manager.
Conclusion
In this blog, we looked at how to deploy a simple Apache Flink cluster on Kubernetes and how easy it is to do the same. Further, we can look how to make jobs run in our Flink cluster and how these jobs are executed.
References
- https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
- http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
