If you are reading this blog I assume you are already familiar with the Apache Airflow basics. If not, please visit “Introduction to Apache-Airflow”.
Before proceeding further let’s understand about the DAGs
What is a DAG?
DAG stands for Directed Acyclic Graph. In simple terms, it is a graph with nodes, directed edges, and no cycles.
In the above example, 1st graph is a DAG while 2nd graph is NOT a DAG, because there is a cycle (Node A →Node B→ Node C →Node A).
Here, In Apache Airflow, “DAG” means “data pipeline”. It is authored using Python programming language. Whenever a DAG is triggered, a DAGRun is created. A DAGRun is an instance of the DAG with an execution date in Airflow.
Steps for writing a DAG file:
- Importing Modules
- Defining default arguments
- Instantiating the DAG
- Defining the tasks
- Defining dependencies
Step 1: Importing modules
Import Python dependencies needed for the workflow.
To create a DAG in Airflow, you always have to import the DAG class i.e. from airflow import DAG.
The next import is related to the operator such as BashOperator, PythonOperator, BranchPythonOperator, etc.
Example:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
Step 2: Defining default arguments
It defines default and DAG-specific arguments. If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.
Example:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
'wait_for_downstream': False,
'dag': dag,
'sla': timedelta(hours=2),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': some_function,
'on_success_callback': some_other_function,
'on_retry_callback': another_function,
'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'
}
Step 3: Instantiating the DAG
Give the DAG name (should be unique), configure the schedule, and set the DAG settings
Here is a couple of options you can use for your schedule_interval. You can choose to use some preset argument or cron-like argument:
Preset | Meaning | Cron |
---|---|---|
None | Don’t schedule, use for exclusively “externally triggered” DAGs | |
@once | Schedule once and only once | |
@hourly | Run once an hour at the beginning of the hour | 0 * * * * |
@daily | Run once a day at midnight | 0 0 * * * |
@weekly | Run once a week at midnight on Sunday morning | 0 0 * * 0 |
@monthly | Run once a month at midnight on the first day of the month | 0 0 1 * * |
@yearly | Run once a year at midnight of January 1 | 0 0 1 1 * |
Example
- Monthly schedule:
- schedule_interval=’@monthly
- schedule_interval=’0 0 1 * *’
- Hourly schedule:
- schedule_interval=’@hourly
- schedule_interval=’0 * * * *’
Step 4: Defining the tasks
The next step is to lay out all the tasks in the workflow.
A node in the DAG represents the task. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them in order to express the order they should run in.
There are three basic kinds of tasks:
- Operators– predefined task templates that you can string together quickly to build most parts of your DAGs.
- Sensors– a special subclass of Operators which are entirely about waiting for an external event to happen.
- TaskFlow– decorated @task, which is a custom Python function packaged up as a Task.
Example:
task_1 = PythonOperator(
task_id = "task_1",
python_callable = # method_1
)
task_2 = BranchPythonOperator(
task_id = "task_2",
python_callable = # method_2
)
task_3 = BashOperator(
task_id = "task_3",
bash_command = "echo 'This is Task 3'"
)
Step 5: Defining dependencies
The key part of using Tasks is defining how they relate to each other – their dependencies, or as we say in Airflow, their upstream and downstream tasks. Firstly, you should declare your Tasks, and then you declare their dependencies second.
There are two ways of declaring dependencies – using the >> and << (bitshift) operators:
first_task >> second_task >> [third_task, fourth_task]
Or the more explicit set_downstream and set_upstream methods:
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
Example:
- task_2 and task_3 depends on task_1
- task_1.set_downstream([task_2, task_3])
- task_1 >> [task_2, task_3]
- [task_2, task_3] << task_1
Code
The below-attached screenshot is the complete example of a DAG creation in Apache-Airflow.
Viewing DAG in Airflow
After running the code, when you go to the browser and write, localhost:8080.
you will see various DAGs which are already created by Airflow.
Click on your DAG
After clicking, you will get a detailed view of the tasks.
You can also check the Graph View for better visualization of Tasks and their dependencies
Similarly, you can check other views such as Calendar, Task Duration, Gantt, Code, etc.
I hope you are now able to understand the DAG creation in Apache Airflow. Stay tuned for the next part.
Read Apache-Airflow documentation for more knowledge.
To gain more information visit Knoldus Blogs.
very nice, I have a question , what is the use of parameter ti in method _choosing_best_method? Can i give mi instead of ti
yes, you can. use mi instead of ti. Here “ti” refers to “task_instance”
References
1. https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
2. https://stackoverflow.com/questions/46059161/airflow-how-to-pass-xcom-variable-into-python-function