Creating DAG in Apache Airflow

Reading Time: 5 minutes

In my previous blog, I have discussed about the Introduction to the Apache Airflow. In this blog, we will learn how to create a DAG for Airflow that would define a workflow of tasks and their dependencies. 

What is DAG?

First of all the question that comes to our mind is that what is this DAG .So in Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. In simple terms, it is a graph with nodes, directed edges and no cycles. Basically, this:

A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

Declaring a DAG

Now we know what DAG is but next question comes that how will we declare any DAG .So basically there are three ways to declare a DAG – either you can use a context manager, which will add the DAG to anything inside it implicitly:

with DAG("my_dag_name") as dag:
    op = DummyOperator(task_id="task")

Or, you can use a standard constructor, passing the dag into any operators you use:

my_dag = DAG("my_dag_name")
op = DummyOperator(task_id="task", dag=my_dag)

Or, you can use the @dag decorator to turn a function into a DAG generator:

@dag(start_date=days_ago(2))
def generate_dag():
    op = DummyOperator(task_id="task")

dag = generate_dag()

DAGs are nothing without Tasks to run, and those will usually either come in the form of either OperatorsSensors or TaskFlow.

Define your first DAG

Now are basics are clear about DAG so Let’s start and define our first DAG.

In this section, we will create a workflow in which the first step will be to print “Getting Live Cricket Scores” on the terminal, and then using an API, we will print the live scores on the terminal. Let’s test the API first and for that, you need to install the cricket-cli library using the following command.

sudo pip3 install cricket-cli

Now, run the following command and get the scores.

cricket scores

Importing the Libraries

Now, we will create the same workflow using Apache Airflow. The code will be completely in python to define a DAG. Let’s start with importing the libraries that we need. We will use only the BashOperator only as our workflow requires the Bash operations to run only.

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

Defining DAG Arguments

For each of the DAG, we need to pass one argument dictionary. Here is the description of some of the arguments that you can pass:

  • owner: The name of the owner of the workflow, should be alphanumeric and can have underscores but should not contain any spaces.
  • depends_on_past: If each time you run your workflow, the data depends upon the past run then mark it as True otherwise mark it as False.
  • start_date: Start date of your workflow
  • email: Your email ID, so that you can receive an email whenever any task fails due to any reason.
  • retry_delay: If any task fails, then how much time it should wait to retry it.
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'bhavya',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

Defining the Tasks

We will have 2 tasks for our workflow:

  1. print: In the first task, we will print the “Getting Live Cricket Scores!!!” on the terminal using the echo command.
  2. get_cricket_scores: In the second task, we will print the live cricket scores using the library that we have installed.

Now, while defining the task first we need to choose the right operator for the task. Here both the commands are terminal-based so we will use the BashOperator.

We will pass the task_id which is a unique identifier of the task and you will see this name on the nodes of Graph View of your DAG. Pass the bash command that you want to run and finally the DAG object to which you want to link this task.

Finally, create the pipeline by adding the “>>” operator between the tasks.

# define the first task
t1 = BashOperator(
    task_id='print',
    bash_command='echo Getting Live Cricket Scores!!!',
    dag=dag,
)


# define the second task
t2 = BashOperator(
    task_id='get_cricket_scores',
    bash_command='cricket scores',
    dag=dag,
)

# task pipeline
t1 >> t2

Update the DAGS in Web UI

Now, refresh the user interface and you will see your DAG in the list. Turn on the toggle on the left of each of the DAG and then trigger the DAG.

Click on the DAG and open the graph view and you will see something like this. Each of the steps in the workflow will be in a separate box and its border will turn dark green once it is completed successfully.

Now, click on View Log to see the output of your code.

That’s it. You have successfully created your first DAG in the Apache Airflow.

Conclusion

In this article, we have seen the features of Apache Airflow, its user interface components and we have created a simple DAG. In the upcoming blogs, we will discuss some more concepts like variables, branching, and will create a more complex workflow.

For more awesome Tech Blogs on various other technologies please visit Knoldus Blogs

Written by 

Bhavya is a Software Intern at Knoldus Inc. He has completed his graduation from IIMT College of Engineering. He is passionate about Java development and curious to learn Java Technologies.