Creating a DAG in Apache Airflow

Reading Time: 4 minutes

In my previous blog, I have discussed Airflow – A workflow ManagerIn this blog, we will write a DAG for Airflow that would define a workflow of tasks and their dependencies. Before writing a DAG file, we will first look into the operators that can be used while writing a DAG.

Airflow Operators

An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs. Airflow provides many built-in operators for many common tasks, including:

  • BashOperator executes a bash command
  • PythonOperator calls an arbitrary Python function
  • EmailOperator sends an email
  • SimpleHttpOperator sends an HTTP request
  • SqliteOperator SQLite DB operator
  • DockerOperator Execute a command inside a docker container.
  • MySqlOperator Executes sql code in a specific MySQL database
  • PostgresOperator – Executes sql code in a specific Postgres database

Now, we will begin to write our first DAG in Apache Airflow. 

Use Case

Viewing the current scenario, getting vaccinated for Covid is the utmost important task for us. Logging in to the vaccine registration site, and checking the available slots is a tedious task, when several people are trying to access the same server. Wouldn’t it be great to get an email notification every morning describing the available slots and their respected venues? 

Let’s write a DAG for it, that would fetch the data from the API, then will store the result in a file, and send that file as an email attachment to your mail every morning.
For an email operator to work, we first need to configure SMTP in Airflow.
Make the following changes in $AIRFLOW_HOME/airflow.cfg file:

[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = vidushi
smtp_password = [16-Digit-password-from-your-mail]
smtp_port = 587
smtp_mail_from = vidushi.bansal@knoldus.com
smtp_timeout = 30
smtp_retry_limit = 5

Let’s write a python code, defining the workflow.

import json
import pathlib
import airflow
import requests
import requests.exceptions as requests_exceptions 
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.email_operator import EmailOperator

# Instantiate a DAG object; this is the starting point of any workflow.
dag = DAG(
   dag_id="vaccine_details",   # Name of the DAG
   start_date=airflow.utils.dates.days_ago(2),  # The date at which the DAG should first start running
   schedule_interval=None,  # We will manually trigger the DAG
)

# Creating first task
download_vaccine_details = BashOperator(
   task_id="download_vaccine_details", #Name of the task
   #Curl request to fetch the vaccine details from the API exposed by the government and store the results in vaccine.json file.
   bash_command="curl -X GET 'https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/findByPin?pincode=110001&date=16-05-2021' -H  'accept: application/json' | jq >> /tmp/vaccine.json",
   dag=dag, #Reference to the DAG variable
)

#Creating second task
notify_user = BashOperator(
   task_id="notify_user",
   # Print the file content in the task log 
   bash_command='cat /tmp/vaccine.json',
   dag=dag,
)

#Creating third task
email = EmailOperator(
   task_id='send_email',
   to='vidushi.bansal@knoldus.com',
   subject='Vaccine Updates in your area',
   html_content=""" <h3>Vaccine updates in your area</h3> """,
   files=['/tmp/vaccine.json'],
   dag=dag #Reference to the DAG variable
)

# Set the order of execution of tasks. 
download_vaccine_details >> notify_user >> email

Viewing DAG in Web UI of Airflow:

Open localhost:8080 in your web browser. Airflow dashboard will be displayed, which contains several DAGs created by Airflow for a new user to begin learning. Find the dag from the dag_id you created.

Click on the DAG to have a detailed look at the tasks.

Here we have three tasks: download_vaccine_details, notify_user and send_email.
Turn on the Dag

The schedule interval is set to None, so we will manually trigger the DAG.


Click on the Trigger Dag button. The workflow will begin.


You can also check the Graph View fro better visualization of Tasks and their dependencies

All three tasks are successful. You can also check logs of these tasks and retry a task in case of failure.
Task Duration will demonstrate the duration of time per task

Task Tries will give information about the number of tries a task took to be successful.
Landing times allows you to compare how tasks have performed over time.
Gantt is another way of visualizing the task.
Details consist of the DAG details including Schedule Interval, Start Date, End Date, max active runs, default args, Concurrency, FilePath, owner, and tags.

[Note: The above API is not available for use now. This is just an example of fetching data from an API and imbibing this in our workflow. You can create many such workflows to begin with.]