Knoldus Blog Audio
In my previous blog, I have discussed Airflow – A workflow Manager. In this blog, we will write a DAG for Airflow that would define a workflow of tasks and their dependencies. Before writing a DAG file, we will first look into the operators that can be used while writing a DAG.
An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs. Airflow provides many built-in operators for many common tasks, including:
- BashOperator – executes a bash command
- PythonOperator – calls an arbitrary Python function
- EmailOperator – sends an email
- SimpleHttpOperator – sends an HTTP request
- SqliteOperator – SQLite DB operator
- DockerOperator – Execute a command inside a docker container.
- MySqlOperator – Executes sql code in a specific MySQL database
- PostgresOperator – Executes sql code in a specific Postgres database
Now, we will begin to write our first DAG in Apache Airflow.
Viewing the current scenario, getting vaccinated for Covid is the utmost important task for us. Logging in to the vaccine registration site, and checking the available slots is a tedious task, when several people are trying to access the same server. Wouldn’t it be great to get an email notification every morning describing the available slots and their respected venues?
Let’s write a DAG for it, that would fetch the data from the API, then will store the result in a file, and send that file as an email attachment to your mail every morning.
For an email operator to work, we first need to configure SMTP in Airflow.
Make the following changes in $AIRFLOW_HOME/airflow.cfg file:
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = vidushi
smtp_password = [16-Digit-password-from-your-mail]
smtp_port = 587
smtp_mail_from = firstname.lastname@example.org
smtp_timeout = 30
smtp_retry_limit = 5
Let’s write a python code, defining the workflow.
import json import pathlib import airflow import requests import requests.exceptions as requests_exceptions from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime from airflow.operators.email_operator import EmailOperator # Instantiate a DAG object; this is the starting point of any workflow. dag = DAG( dag_id="vaccine_details", # Name of the DAG start_date=airflow.utils.dates.days_ago(2), # The date at which the DAG should first start running schedule_interval=None, # We will manually trigger the DAG ) # Creating first task download_vaccine_details = BashOperator( task_id="download_vaccine_details", #Name of the task #Curl request to fetch the vaccine details from the API exposed by the government and store the results in vaccine.json file. bash_command="curl -X GET 'https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/findByPin?pincode=110001&date=16-05-2021' -H 'accept: application/json' | jq >> /tmp/vaccine.json", dag=dag, #Reference to the DAG variable ) #Creating second task notify_user = BashOperator( task_id="notify_user", # Print the file content in the task log bash_command='cat /tmp/vaccine.json', dag=dag, ) #Creating third task email = EmailOperator( task_id='send_email', email@example.com', subject='Vaccine Updates in your area', html_content=""" <h3>Vaccine updates in your area</h3> """, files=['/tmp/vaccine.json'], dag=dag #Reference to the DAG variable ) # Set the order of execution of tasks. download_vaccine_details >> notify_user >> email
Viewing DAG in Web UI of Airflow:
Open localhost:8080 in your web browser. Airflow dashboard will be displayed, which contains several DAGs created by Airflow for a new user to begin learning. Find the dag from the dag_id you created.
Click on the DAG to have a detailed look at the tasks.
Here we have three tasks: download_vaccine_details, notify_user and send_email.
Turn on the Dag
The schedule interval is set to None, so we will manually trigger the DAG.
Click on the Trigger Dag button. The workflow will begin.
You can also check the Graph View fro better visualization of Tasks and their dependencies
All three tasks are successful. You can also check logs of these tasks and retry a task in case of failure.
Task Duration will demonstrate the duration of time per task
Task Tries will give information about the number of tries a task took to be successful.
Landing times allows you to compare how tasks have performed over time.
Gantt is another way of visualizing the task.
Details consist of the DAG details including Schedule Interval, Start Date, End Date, max active runs, default args, Concurrency, FilePath, owner, and tags.
[Note: The above API is not available for use now. This is just an example of fetching data from an API and imbibing this in our workflow. You can create many such workflows to begin with.]