Apache Airflow: Connect with Kubernetes Cluster

Reading Time: 4 minutes

What is Airflow?

  • Airflow is a free to use and open-source workflow orchestration framework developed by Apache that is used to manage workflows
  • Most popular and one of the best workflow management systems out there with great community support.

What are operators and why we need them?

  • In practical, Airflow DAGs (Directed Acyclic Graph) only represent the workflow and won’t be doing any computations (or performing operations). So to do actual operations we need operators. Operators are kind of a placeholders in this case that help us to define what operations we want to perform. For more information on Airflow Operators please read this blog.

Why use Airflow with Kubernetes?

  • From the begining, Airflow is known for its flexibility. It provides integration support for various services used by several cloud providers and other platforms like Spark. Apart from that the feature of its plug-in framework makes the extensibility very simple.
  • But there’s one drawback is that Apache airflow users must use the clients and frameworks provided by airflow worker when doing the execution. They have no other choice.
  • But a company usually have multiple workflows for different purposes like data science pipelines, production applications etc. So the difference in purposes leads to dependency management problems because both the teams would be obviously using different libraries as the use case is different.
  • Kubernetes solve this problem by allowing users to create and launch different Kubernetes configurations and pods.
  • After setting up the pods users that are using airflow will now have full control over resources, run-environments and secrets making airflow more powerful and allowing users to do any job they want to do using airflow workflows.

The Kubernetes Operator:

  • So the Airflow already has operators for different frameworks like BigQuery, Apache Spark, Hive etc. Apart from that it also allows developers to develop their own connectors for various requirements.
  • Developers who are using Airflow consistently try to make deployments and ETL (Extract, Transform and Load) pipelines easier to manage. So in order to achieve this developers or Dev-Ops engineers are always looking for ways to decouple pipeline steps and increase monitoring to reduce the risk of future outages and other problems like fire-fights (emergency allocation of resources due to unexpected scenarios like extra stress or load) that might arise.

Using Airflow with Kubernetes will help with the following:

More flexibility when deploying applications:

  • Plugin API for Airflow offers great support when engineers want to test the new features that are implemented into their DAGs. However, whenever a developer develop a new operator he/she had to build the whole new plugin.
  • But as of now (after using Airflow with Kubernetes), they can run any task within a docker container using the exact same operator while not worrying about the extra Airflow code to maintain.

More flexibility for dependencies and configurations:

  • The operators that are running within static Airflow workers, dependencies become cumbersome to manage. For example let’s take a hypothetical scenario in which Airflow developer is using the SciPy (a python library) for task A, and task B requires NumPy (another python library) then the developer would need to either take of both the dependencies within all Airflow workers (not an easy task) otherwise offload the task to some external machine (may create issues if something unexpected happens to that machine)
  • The use of custom (docker) images allow Airflow users to be sure that everything is working as expected (environment for tasks, dependencies and configurations).

To make Airflow more secure :

  • Security of sensitive data (like credentials, client details etc) is the main concern of the development team. The need of isolating passwords, API keys and other important credentials arises for the security purposes.
  • With the use of Kubernetes Operator, responsible team can make use of Kubernetes Vault technology to store any sensitive data.
  • After using kubernetes vault only the required person(s) will have access to the sensitive data (unlike previously where usually all workers used to have access).

Architecture:

  • Kubernetes Operator makes use of Python Client (for Kubernetes) and create a request which will then be processed by APIServer (1).
  • After this the desired pod will be launched according to the defined specifications (2).
  • Necessary image(s) can be loaded according to the defined parameters with the use of only single command. After the job launch, operator will only monitor the track logs health. However, users can choose to do logs gathering locally to the scheduler or use any log distribution service present currently in the kubernetes cluster.

Example of Airflow using a Kubernetes Operator:

  • The following DAG will create two Kubernetes pods:
    • A Linux distro with Python
    • an Ubuntu distro without Python
  • The pod having Python will do a successful Python Request while the pod without Python will report back a failure to Airflow user.
  • If everything is working as expected the “pass-task” pod will complete and the “fail-task” pod should throw an error to the Airflow web-server.

Demo-Code:

from airflow import DAG
from datetime import timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
pass = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )
fail = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )
pass.set_upstream(start)
fail.set_upstream(start)

Conclusion:

  • Apache Airflow with Kubernetes makes the job of Dev-Ops engineers and developers easier by providing an easier way to maintain pipelines while adding security to the application.
  • Apache Airflow with Kubernetes provides an easier way to manage all the dependencies and configurations
  • Curious to learn more about these awesome tools? please visit Airflow official documentation and Official documentation of kubernetes

For more airflow and other tech-blogs please visit Knoldus Blogs

knoldus