Flink Architecture And Cluster Deployment

Reading Time: 4 minutes

In this blog, we will be discussing Flink Architecture and its core components.

Introduction

  • Flink is a distributed system and requires effective allocation and management of compute resources in order to execute streaming applications.
  • It integrates with all common cluster resource managers such as Hadoop YARN and Kubernetes,.
  • In addition it,it can run standalone cluster or even as a library.
  • The Flink Architecture runtime consists of two types of processes: a JobManager and one or more TaskManagers.

  • The Client is not part of the runtime and program execution.
  • It use case is to prepare and send a dataflow to the JobManager.
  • After that, the client can disconnect (detached mode), or stay connected to receive progress reports (attached mode).
  • The client runs either as part of the Java/Scala program that triggers the execution, or in the command line process ./bin/flink run ....
  • There are various ways to start JobManager and TaskManagers :
    • Firstly, directly on the machines as a standalone cluster.
    • Secondly, in containers, or managed by resource frameworks like YARN.
    • TaskManagers connect to JobManagers, announcing themselves as available, and work assigns.

JobManager

The JobManager component in Flink Architecture has a number of responsibilities related to coordinating the distributed execution of Flink Applications:

  •  It decides when to schedule the next task.
  • The Flink job can be executed as Session Cluster, Application Cluster, and Job Cluster. (or set of tasks).
  • Reacts to finished tasks or execution failures.
  • Coordinates checkpoints, and coordinates recovery on failures, among others.
  • This process consists of three different components:
    • ResourceManager
      • The ResourceManager is responsible for resource de-/allocation and provisioning in a Flink cluster.
      • It manages task slots, which are the unit of resource scheduling in a Flink cluster.
      • Flink implements multiple ResourceManagers for different environments and resource providers such as YARN, Kubernetes and standalone deployments.
      • In a standalone setup, the ResourceManager can only distribute the slots of available TaskManagers and cannot start new TaskManagers on its own.
    • Dispatcher
      • The Dispatcher provides a REST interface to submit Flink applications for execution and starts a new JobMaster for each submitted job.
      • It also runs the Flink WebUI to provide information about job executions.
    • JobMaster
      • JobMaster is responsible for managing the execution of a single JobGraph. Multiple jobs can run simultaneously in a Flink cluster, each having its own JobMaster.
  • There is always at least one JobManager. A high-availability setup might have multiple JobManagers, one of which is always the leader, and the others are standby.

TaskManagers

  • The TaskManagers (also called workers) execute the tasks of a dataflow, and buffer and exchange the data streams.
  • There must always be at least one TaskManager.
  • The smallest unit of resource scheduling in a TaskManager is a task slot.
  • The number of task slots in a TaskManager indicates the number of concurrent processing tasks.
  • Note that multiple operators may execute in a task slot

Tasks and Operator Chains

  • For distributed execution, Flink chains operator subtasks together into tasks.
  • A single thread executes each task.
  • Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.
  • The sample dataflow in the figure below executes with five subtasks, and hence with five parallel threads.
Operator chaining into Tasks
  • The jobs of a Flink Application can either be submitted to a long-running Flink Session Cluster, a dedicated Flink Job Cluster, or a Flink Application Cluster.
  • The difference between these options is mainly related to the cluster’s lifecycle and to resource isolation guarantees.
  • Cluster Lifecycle:
    • In a Flink Session Cluster, the client connects to a pre-existing, long-running cluster that can accept multiple job submissions.
  • Even after all jobs finishes, the cluster (and the JobManager) will keep running until we manually stop.
  • The lifetime of a Flink Session Cluster is therefore not bound to the lifetime of any Flink Job.
  • Resource Isolation:
    • ResourceManager allocates the TaskManager slots on job submission and releases it once the job finishes.
    • Because all jobs are sharing the same cluster, there is some competition for cluster resources — like network bandwidth in the submit-job phase.
    • One limitation of this shared setup is that if one TaskManager crashes, then all jobs that have tasks running on this TaskManager will fail.
    • In a similar way, if some fatal error occurs on the JobManager, it will affect all jobs running in the cluster.
  • Cluster Lifecycle:
    • In a Flink Job Cluster, the available cluster manager (like YARN) spins up a cluster for each submitted job.
    • And this cluster is available to that job only.
    • The client first requests resources from the cluster manager to start the JobManager and submits the job to the Dispatcher running inside this process.
    • TaskManagers are then lazily allocated based on the resource requirements of the job.
    • Once the job finishs, the Flink Job Cluster tores down.
  • Resource Isolation:
    • A fatal error in the JobManager only affects the one job running in that Flink Job Cluster.
  • Cluster Lifecycle:
    • A Flink Application Cluster is a dedicated Flink cluster that only executes jobs from one Flink Application and where the main() method runs on the cluster rather than the client.
    • The job submission is a one-step process: we don’t need to start a Flink cluster first and then submit a job to the existing cluster session.
    • Instead, we package our application logic and dependencies into a executable job JAR and the cluster entrypoint (ApplicationClusterEntryPoint) is responsible for calling the main() method to extract the JobGraph.
    • This allows us to deploy a Flink Application like any other application on Kubernetes, for example.
    • The lifetime of a Flink Application Cluster is therefore bound to the lifetime of the Flink Application.
  • Resource Isolation:
    • In a Flink Application Cluster, the ResourceManager and Dispatcher scopes to a single Flink Application.
    • It provides a better separation of concerns than the Flink Session Cluster.

Conclusion

  • In this blog, we discussed the Flink Architecture and its execution use cases. The Flink job can be executed as Session Cluster, Application Cluster, and Job Cluster.
Knoldus-blog-footer-image

Written by 

Am a technology enthusiast having 3+ years of experience. I have worked on Core Java, Apache Flink, Apache Beam, AWS, GCP, Kafka, Spark, MySQL. I am curious about learning new technologies.