Apache Airflow: Writing your first pipeline

Reading Time: 3 minutes

Before jumping into the code, you need to get used to what Airflow DAG is all about. it is important so stay with me,

Airflow DAG?

DAG stands for Directed Acyclic Graph. In simple terms, it is a graph with nodes, directed edges and no cycles. Basically, this is a DAG:

Airflow: Writing your first pipeline

We will learn step by step how to write your first DAG.

Steps to write an Airflow DAG.

  • A DAG file, which is basically just a python script, is a configuration file specifying the DAG’s structure as code.
  • There are only 5 steps you need to remember to write an Airflow DAG or workflow.
  1. Importing Modules
  2. Default Arguments
  3. Instantiate a DAG
  4. Task
  5. Setting up dependencies

Step 1: Importing Modules

  • Import Python dependencies needed for the workflow
from datatime import timedelta
import airflow
from airflow import DAG
from airflow.operations.bash_operator import BashOperator

Step 2: Default Arguments

default_args = {
      'owner' : airflow.utils.dates.days_ago(2),
      # 'end_date' : datetime(2018, 12, 30),
      'depends_on_past' : False,
       'email' : {airflow@example.com},
       'email_on_failure' : False,
       'email_on_retry' : False,
       # If a task fails, retry it once after waiting
       # at least 5 mintues
       'retries' : 1,
       'retry_delay' : timedelta(minutes=5),
       }

Step 3: Instantiate a DAG

  • Give the DAG name, configure the schedule, and set the DAG settings
dag = DAG{
      'tutorial',
      default_args=default_args,
      description='A simple tutorial DAG',
      # Continue to run DAG once per day
      schedule_interval=timedelta(days=1),

Here is a couple of options you can use for your schedule_interval. You can choose to use some preset argument or cron-like argument:

presetmeaningcron
NoneDon’t schedule, use for exclusively “externally triggered” DAGs
@onceSchedule once and only once
@hourlyRun once an hour at the begining of the hour0 * * * *
@dailyRun once a day at midnight0 0 * * *
@weeklyRun once a week at midnight on Sunday morning0 0 * * 0
@monthlyRun once a month at midnight of the first day of the month0 0 1 * *
@yearlyRun once a year at midnight of January 10 0 1 1 *

Example usage

  • Daily schedule
> schedule_interval= '@daily'
> schedule_interval= '0  0  *  *  *'

Step 4: Tasks

  • The next step is to lay out all the tasks in the workflow.
# t1, t2 and t3 are example of tasks created by instantiating operators
  t1 = BashOperator(
       task_id='print_date',
       bash_command= 'date',
       dag=dag,
    )
  t2 = BashOperator(
       task_id= 'sleep',
       depeneds_on_past=False,
       bash_command= 'sleep 5',
       dag=dag,
    )

templated_command = """
  {% for i in range(5) %}
     echo "{{ ds }}"
     echo "{{ macros.ds_add(ds, 7)}}"
     echo "{{ params.my_param }}"
  {% endfor %}
  ===

t3 = BashOperator(
     task_id= 'templated',
     depends_on_past= False,
     bash_command= templated_command,
     params= {'my_param' : 'Parameter I passed in'},
     dag=dag,
  }

Step 5: Setting up Dependencies

  • Set the dependencies or the order in which the tasks should be executed in.
  • Here’s a few ways you can define dependencies between them:
# This means that t2 will depend on t1
# running successfully to run.
  t1.set_downstream(t2)

# similar to above where t3 will depend on t1
  t3.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations: 
  t1 >> t2

# And the upstream dependency with the 
# bit shift operator:
  t2 << t1

# A list of tasks can also be set as
# dependencies. These operations 
# all have the same effect:
  t1.set_downstream([t2, t3])
  t1 >> [t2, t3]
  [t2, t3] << t1

Summary

  • Basically, a DAG is just a Python file, which is used to organize tasks and set their execution context. DAGs do not perform any actual computation.
  • Instead, tasks are the element of Airflow that actually “do the work” we want to be perform. And it is your job to write the configuration and organize the tasks in specific orders to create a complete data pipeline.

Conclusion

In this blog, we learned how to write your first DAG. Stay tuned for next coming blogs in detailed version.

https://airflow.apache.org/