Creating DAG in Apache Airflow

Reading Time: 4 minutes

If you are reading this blog I assume you are already familiar with the Apache Airflow basics. If not, please visit “Introduction to Apache-Airflow”.
Before proceeding further let’s understand about the DAGs

What is a DAG?

DAG stands for Directed Acyclic Graph. In simple terms, it is a graph with nodes, directed edges, and no cycles.

In the above example, 1st graph is a DAG while 2nd graph is NOT a DAG, because there is a cycle (Node A →Node B→ Node C →Node A). 

Here, In Apache Airflow, “DAG” means “data pipeline”. It is authored using Python programming language. Whenever a DAG is triggered, a DAGRun is created. A DAGRun is an instance of the DAG with an execution date in Airflow.

Steps for writing a DAG file:

  • Importing Modules
  • Defining default arguments
  • Instantiating the DAG
  • Defining the tasks
  • Defining dependencies

Step 1: Importing modules

Import Python dependencies needed for the workflow.
To create a DAG in Airflow, you always have to import the DAG class i.e. from airflow import DAG.
The next import is related to the operator such as BashOperator, PythonOperator, BranchPythonOperator, etc.

Example:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

Step 2: Defining default arguments

It defines default and DAG-specific arguments. If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.

Example

default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'email': ['airflow@example.com'],
   'email_on_retry': False,
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
   'queue': 'bash_queue',
   'pool': 'backfill',
   'priority_weight': 10,
   'end_date': datetime(2016, 1, 1),
   'wait_for_downstream': False,
   'dag': dag,
   'sla': timedelta(hours=2),
   'execution_timeout': timedelta(seconds=300),
   'on_failure_callback': some_function,
   'on_success_callback': some_other_function,
   'on_retry_callback': another_function,
   'sla_miss_callback': yet_another_function,
   'trigger_rule': 'all_success'
}

Step 3: Instantiating the DAG

Give the DAG name (should be unique), configure the schedule, and set the DAG settings

Here is a couple of options you can use for your schedule_interval. You can choose to use some preset argument or cron-like argument:

PresetMeaningCron
NoneDon’t schedule, use for exclusively “externally triggered” DAGs 
@onceSchedule once and only once 
@hourlyRun once an hour at the beginning of the hour0 * * * *
@dailyRun once a day at midnight0 0 * * *
@weeklyRun once a week at midnight on Sunday morning0 0 * * 0
@monthlyRun once a month at midnight on the first day of the month0 0 1 * *
@yearlyRun once a year at midnight of January 10 0 1 1 *

Example

  • Monthly schedule:
    • schedule_interval=’@monthly
    • schedule_interval=’0 0 1 * *’
  • Hourly schedule:
    • schedule_interval=’@hourly
    • schedule_interval=’0 * * * *’

Step 4: Defining the tasks

The next step is to lay out all the tasks in the workflow.

A node in the DAG represents the task.  Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them in order to express the order they should run in.

There are three basic kinds of tasks:

  • Operators– predefined task templates that you can string together quickly to build most parts of your DAGs.
  • Sensors– a special subclass of Operators which are entirely about waiting for an external event to happen.
  • TaskFlow– decorated @task, which is a custom Python function packaged up as a Task.

Example:

task_1 = PythonOperator(
   task_id = "task_1",
   python_callable = # method_1
)

task_2 = BranchPythonOperator(
   task_id = "task_2",
   python_callable = # method_2
)

task_3 = BashOperator(
   task_id = "task_3",
   bash_command = "echo 'This is Task 3'"
)

Step 5: Defining dependencies

The key part of using Tasks is defining how they relate to each other – their dependencies, or as we say in Airflow, their upstream and downstream tasks. Firstly, you should declare your Tasks, and then you declare their dependencies second.

There are two ways of declaring dependencies – using the >> and << (bitshift) operators:

first_task >> second_task >> [third_task, fourth_task]

Or the more explicit set_downstream and set_upstream methods:

first_task.set_downstream(second_task)
third_task.set_upstream(second_task)

Example:

  • task_2 and task_3 depends on task_1
    • task_1.set_downstream([task_2, task_3])
    • task_1 >> [task_2, task_3]
    • [task_2, task_3] << task_1

Code

The below-attached screenshot is the complete example of a DAG creation in Apache-Airflow.

Viewing DAG in Airflow

After running the code, when you go to the browser and write, localhost:8080.
you will see various DAGs which are already created by Airflow.
Click on your DAG

After clicking, you will get a detailed view of the tasks.

You can also check the Graph View for better visualization of Tasks and their dependencies

Similarly, you can check other views such as Calendar, Task Duration, Gantt, Code, etc.

I hope you are now able to understand the DAG creation in Apache Airflow. Stay tuned for the next part.

Read Apache-Airflow documentation for more knowledge.

To gain more information visit Knoldus Blogs.

Written by 

Kuldeep is a Software Consultant at Knoldus Software LLP. He has a sound knowledge of various programming languages like C, C++, Java, MySQL, and various frameworks like Apache Kafka and Spring/Springboot. He is passionate about daily and continuous improvement.

3 thoughts on “Creating DAG in Apache Airflow4 min read

  1. very nice, I have a question , what is the use of parameter ti in method _choosing_best_method? Can i give mi instead of ti

Comments are closed.