Running Apache Airflow DAG with Docker

Table of contents
Reading Time: 3 minutes

In this blog, we are going to run the sample dynamic DAG using docker.

Before that, let’s get a quick idea about the airflow and some of its terms.

What is Airflow?

Airflow is a workflow engine which is responsible for managing and scheduling running jobs and data pipelines. It ensures that the jobs are ordered correctly based on dependencies and also manages the allocation of resources and failures.

Before going forward, let’s get familiar with the terms:

Task or Operator: A defined unit of work.

Task instance: An individual run of a single task. The states could be running, success, failed, skipped, and up for retry.

DAG (Directed Acyclic Graph): A set of tasks with an execution order.

DAG Run: Individual DAG run.

Web Server: It is the UI of airflow, it also allows us to manage users, roles, and different configurations for the Airflow setup.

Scheduler: Schedules the jobs or orchestrates the tasks. It uses the DAGs object to decide what tasks need to be run, when, and where.

Executor: Executes the tasks. There are different types of executors: 

  • Sequential: Runs one task instance at a time.
  • Local: Runs tasks by spawning processes in a controlled fashion in different modes.
  • Celery:  An asynchronous task queue/job queue based on distributed message passing. For CeleryExecutor, one needs to set up a queue (Redis, RabbitMQ or any other task broker supported by Celery) on which all the celery workers running keep on polling for any new tasks to run
  • Kubernetes:  Provides a way to run Airflow tasks on Kubernetes, Kubernetes launch a new pod for each task.

Metadata Database: Stores the Airflow states. Airflow uses SqlAlchemy and Object Relational Mapping (ORM) written in Python to connect to the metadata database.

Now that we are familiar with the terms, let’s get started.

Any time, you run into permission issue execute:

chmod 777 <file>

Let’s create the scripts folder and create a script called airflow-entrypoint.sh for running initdb and webserver.

#!/usr/bin/env bash
airflow upgradedb
airflow webserver

scripts/airflow-entrypoint.sh

Let’s now create the docker-compose file:

docker-compose.yml

version: "2.1"
services:
  postgres:
    image: postgres:12
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5433:5432"

  scheduler:
    image: apache/airflow
    restart: always
    depends_on:
      - postgres
      - webserver
    env_file:
      - .env
    ports:
      - "8793:8793"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./airflow-logs:/opt/airflow/logs
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

  webserver:
    image: apache/airflow
    hostname: webserver
    restart: always
    depends_on:
      - postgres
    env_file:
      - .env
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
      - ./airflow-logs:/opt/airflow/logs
    ports:
      - "8080:8080"
    entrypoint: ./scripts/airflow-entrypoint.sh
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 32

Now, create a .env file for environment variables.

.env

AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=10

At last, create a folder called dags and a file hello-airflow.py

hello-airflow.py

import codecs
import logging
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils import dates

logging.basicConfig(format="%(name)s-%(levelname)s-%(asctime)s-%(message)s", level=logging.INFO)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def create_dag(dag_id):
    default_args = {
        "owner": "jyoti",
        "description": (
            "DAG to explain airflow concepts"
        ),
        "depends_on_past": False,
        "start_date": dates.days_ago(1),
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
        "provide_context": True,
    }

    new_dag = DAG(
        dag_id,
        default_args=default_args,
        schedule_interval=timedelta(minutes=5),
    )

    def task_1(**kwargs):
        logger.info('=====Executing Task 1=============')
        return kwargs['message']

    def task_2(**kwargs):
        logger.info('=====Executing Task 2=============')
        task_instance = kwargs['ti']
        result = task_instance.xcom_pull(key=None, task_ids='Task_1')
        logger.info('Extracted the value from task 1')
        logger.info(result)

    with new_dag:
        task1 = PythonOperator(task_id='Task_1',
                                                    python_callable=task_1,
                                                    op_kwargs=
                                                    {
                                                        'message': 'hellow airflow'
                                                    },
                                                    provide_context=True)

        task2 = PythonOperator(task_id='Task_2',
                                            python_callable=task_2,
                                            op_kwargs=None,
                                            provide_context=True)
        task2.set_upstream(task1)
        return new_dag

dag_id = "hello_airflow1"
globals()[dag_id] = create_dag(dag_id)

We have created two tasks(operators) here, task 2 will execute when task 1 completes, defined by task2.set_upstream(task1).

Task 1 pushes the data to task 2 for that we just need to set provide_context=True. The data will be pulled in task 2 using the task instance and the task id.

task_instance.xcom_pull(key=None, task_ids=’Task_1′)

That is all, now run docker-compose -f docker-compose.yml up and access the airflow on http://0.0.0.0:8080 

Thanks for reading!!!

Written by 

Jyoti Sachdeva is a software consultant with more than 6 months of experience. She likes to keep up with the trending technologies. She is familiar with languages such as C,C++,Java,Scala and is currentky working on akka,akka http and scala. Her hobbies include watching tv series and movies, reading novels and dancing.