Before jumping into the code, you need to get used to what Airflow DAG is all about. it is important so stay with me,
Airflow DAG?
DAG stands for Directed Acyclic Graph. In simple terms, it is a graph with nodes, directed edges and no cycles. Basically, this is a DAG:
We will learn step by step how to write your first DAG.
Steps to write an Airflow DAG.
A DAG file, which is basically just a python script, is a configuration file specifying the DAG’s structure as code.
There are only 5 steps you need to remember to write an Airflow DAG or workflow.
Importing Modules
Default Arguments
Instantiate a DAG
Task
Setting up dependencies
Step 1: Importing Modules
Import Python dependencies needed for the workflow
from datatime import timedelta
import airflow
from airflow import DAG
from airflow.operations.bash_operator import BashOperator
Step 2: Default Arguments
default_args = {
'owner' : airflow.utils.dates.days_ago(2),
# 'end_date' : datetime(2018, 12, 30),
'depends_on_past' : False,
'email' : {airflow@example.com},
'email_on_failure' : False,
'email_on_retry' : False,
# If a task fails, retry it once after waiting
# at least 5 mintues
'retries' : 1,
'retry_delay' : timedelta(minutes=5),
}
Step 3: Instantiate a DAG
Give the DAG name, configure the schedule, and set the DAG settings
dag = DAG{
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
# Continue to run DAG once per day
schedule_interval=timedelta(days=1),
Here is a couple of options you can use for your schedule_interval. You can choose to use some preset argument or cron-like argument:
preset
meaning
cron
None
Don’t schedule, use for exclusively “externally triggered” DAGs
@once
Schedule once and only once
@hourly
Run once an hour at the begining of the hour
0 * * * *
@daily
Run once a day at midnight
0 0 * * *
@weekly
Run once a week at midnight on Sunday morning
0 0 * * 0
@monthly
Run once a month at midnight of the first day of the month
The next step is to lay out all the tasks in the workflow.
# t1, t2 and t3 are example of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command= 'date',
dag=dag,
)
t2 = BashOperator(
task_id= 'sleep',
depeneds_on_past=False,
bash_command= 'sleep 5',
dag=dag,
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
===
t3 = BashOperator(
task_id= 'templated',
depends_on_past= False,
bash_command= templated_command,
params= {'my_param' : 'Parameter I passed in'},
dag=dag,
}
Step 5: Setting up Dependencies
Set the dependencies or the order in which the tasks should be executed in.
Here’s a few ways you can define dependencies between them:
# This means that t2 will depend on t1
# running successfully to run.
t1.set_downstream(t2)
# similar to above where t3 will depend on t1
t3.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
Summary
Basically, a DAG is just a Python file, which is used to organize tasks and set their execution context. DAGs do not perform any actual computation.
Instead, tasks are the element of Airflow that actually “do the work” we want to be perform. And it is your job to write the configuration and organize the tasks in specific orders to create a complete data pipeline.
Conclusion
In this blog, we learned how to write your first DAG. Stay tuned for next coming blogs in detailed version.