What is a DAG in Apache Airflow?
In this blog, we are going to see what is the basic structure of DAG in Apache Airflow and we will also Configure our first Data pipeline.
A DAG in apache airflow stands for Directed Acyclic Graph which means it is a graph with nodes, directed edges, and no cycles. An Apache Airflow DAG is a data pipeline in airflow. For Example:
This is either a data pipeline or a DAG. You have four tasks – T1, T2, T3, and T4. These are the nodes and directed edges are the arrows as we can see in the above diagram corresponding to the dependencies between your tasks. Task T1 must be executed first and then T2, T3, and T4.
In this deck, there is no cycle or no loop. But if we see the below diagram, there is a loop as we can see T2 depends on T1 but also T1 depends on T2. This is not a DAG. This is not a valid DAG. In airflow, you will receive an error.
So basically, we have to keep in mind that our DAG is a data pipeline in airflow where the nodes are the tasks. The directed edges are the dependencies between your tasks.
The second important concept that we should know is the concept of operators.
What is an Operator in Apache Airflow?
An operator is nothing more than a task in our DAG. Let’s take the below data pipeline as an example:
Here T2, T3, T4, and T1 actually are tasks. But let’s say T2 executes a python function, then T3 executes a bash command, and T4 inserts data into a database. Here, T2, T3, and T4 are the tasks but behind the scene, each task is actually an operator. For example, you want to execute a python function, you will use a python operator. You want to execute a bash command, you will use a bash operator, and similarly, if you want to insert data in the database, you might use the Postgres operator.
So depending upon the action you want to trigger from your DAG, you will use the corresponding operator and airflow has a ton of different operators. Always keep in mind that an operator is nothing but more than an object encapsulating the job you want to run in your DAG. Each time you add an operator in your DAG, you are in fact adding a new task. Let’s jump into the code now.
Coding Your First Data Pipeline
Note: Make sure apache-airflow is up and running in the background.
Step 1: Create folder,, sub folder and .py file
Create a folder named “apache-airflow” and then create a sub-folder named dags inside. Create a file inside the dags folder and name it my_dag.py. (You can give whatever names for files and folders).
The first and most important part while creating your data pipeline is to get all the correct imports. There is one import you are always going to use is dag class.
Step 2: Import required classes
Import all the required classes/libraries.
- from airflow import DAG: Always import the dag class as this file actually a dag data pipeline.
- from datetime import datetime: A data pipeline expects a start date on which the data pipeline is being scheduled.
Step 3: Creating instance DAG class
Create an instance of the DAG class. To create the instance, we can use the context manager i.e with.
Once we have the DAG instance, we have to provide some parameters.
Step 4: Adding parameters
The first one is a DAG Id which must be a unique identifier otherwise you might end up with weird behaviors. The second parameter that we have to provide is the start date.
What is a start date? The start date is the date at which your DAG will be scheduled.
Example: with DAG(“my_dag”, start_date=datetime(2022, 22, 2)
You have to specify a DateTime object and then add whatever date you want to put in. Putting the above date means that the DAG is scheduled or will start on 22nd February 2022.
There are two more additional parameters that can be really useful i.e schedule_interval and catchup.
Step 5: Adding additional parameters
schedule_interval: scheduling interval defines the frequency of interval of time at which your DAG is triggered. For example, you want to trigger your DAG every 10 minutes you will specify this in the scheduled interval. We can do this by using a cron expression. A cron expression is a way to define the interval of times in Unix. Airflow brings some preset cron expressions that we can use in the above case. To trigger a DAG every day, you can use the preset @daily. There are a lot more presets available in airflow which we will see later.
catchup: Each time the DAG is triggered, a diagram object is created. A diagram object is an instance of the DAG running at a given date. By default, airflow tries to run all the non-triggered diagrams between the start date and the end date. In case you have set up the start date as one year ago and schedule_interval to daily, a lot of DAG instances will keep running. To prevent this, we use the parameter catchup. Set the parameter to false. Doing so, only the latest non-triggered diagrams are triggered by airflow.
Important note on scheduling and presets:
In airflow, this DAG with a start date of 22nd February 2022 and a scheduling interval to daily. The DAG is going to be effectively triggered after the start date plus the scheduling interval. This DAG is going to be effectively triggered on 23rd February 2022 at midnight. Since the scheduling interval is set to daily, it is triggered after the start date plus one day. That’s how DAGS is being scheduled.
Step 6: Implement the Tasks
To add a task in your DAG, import the operator. Here we will use a python operator to execute a python function. After importing the operator we can just define the task. Here, we have to add two important things i.e task_id that should be unique and a python_callable function that you want to call from that task.
Step 7: Create python function
Now, just create python functions that you are calling from the pyhton_callable method. Define all the methods you are going to use to execute the tasks you have defined.
Step 8: Specify the dependencies
In order to set the order of the execution of your tasks, you have to specify the dependencies as the last step of your pipeline creation.
Result
Above is the tree view of the DAG having DAG Id as my_dag. We can also see the graph view of the same.
Complete Code
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from random import randint
from datetime import datetime
def _choose_best_model(ti):
accuracies = ti.xcom_pull(task_ids=[
'training_model_A',
'training_model_B',
'training_model_C'
])
best_accuracy = max(accuracies)
if (best_accuracy > 8):
return 'accurate'
return 'inaccurate'
def _training_model():
return randint(1, 10)
with DAG("my_dag", start_date=datetime(2021, 1, 1),
schedule_interval="@daily", catchup=False) as dag:
training_model_A = PythonOperator(
task_id="training_model_A",
python_callable=_training_model
)
training_model_B = PythonOperator(
task_id="training_model_B",
python_callable=_training_model
)
training_model_C = PythonOperator(
task_id="training_model_C",
python_callable=_training_model
)
choose_best_model = BranchPythonOperator(
task_id="choose_best_model",
python_callable=_choose_best_model
)
accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command="echo 'inaccurate'"
)
[training_model_A, training_model_B, training_model_C] >> choose_best_model >> [accurate, inaccurate]
Conclusion
The above pipeline checks for fake accuracy. As per the tasks, it will generate random numbers between 1 to 10, whenever we get a number bigger than 8, we get “accurate” as the result. So here we have learnt how to create out first data pipeline and a lot deeply about DAGs.
For more tech blogs, please visit Knoldus Blogs.