
Airflow dynamic DAGs can save you a ton of time. As you know, Apache Airflow is written in Python, and DAGs are created via Python scripts. That makes it very flexible and powerful (even complex sometimes). By leveraging Python, you can create DAGs dynamically based on variables, connections, a typical pattern, etc. This very nice way of generating DAGs comes at the price of higher complexity and subtle tricky things that you must know
Use Case
Why might you need dynamic DAGs? It’s a good question. Let’s find out through an example.
Let’s imagine that you have a DAG that extracts, processes, and stores statistics derived from your data.



Very simple DAG. Now, let’s say this DAG has different configuration settings. For example:
- source (could be a different FTP server, API route etc.)
- staticstics (could be mean, median, standard deviation, all of them or only one of those)
- destination table (could be a different table for each API route, folder etc)
Also, you could have different settings for each of your environments: dev, staging, and prod.
The bottom line is that you don’t want to create the same DAG, the same tasks repeatedly with just slight modifications. So here when the concept of dynamic DAGs come. It helps to solve all such problems which includes such as duplication of code.
Methods for generating DAGs dynamically
There are 2 methods through which we can generate DAGs dynamically :-
- The single-file method
- The multiple-file method
So now we will deep dive into these methods and would explore more in order to understand dynamic DAGs concept with more clarity.
The single-file method
The single-file method is the easiest way to generate DAGs dynamically. With this method, you have:
- a single python file
- a function that returns a DAG
- input parameters
- a loop that generates the DAGs
Below is the example code for single file method.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
def create_dag(symbol):
with DAG(f'get_price_{symbol}', start_date=datetime(2022,1, 1),
schedule_interval='@daily', catchup=False) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def send_email(symbol):
print(symbol)
return symbol
send_email(process(extract(symbol)))
return dag
for symbol in ("APPL", "FB", "GOOGL"):
globals()[f"dag_{symbol}"] = create_dag(symbol)



There are some drawbacks as well for this method so that’s why 2nd method is more recommended .
- You have no visibility on the code of the generated DAGs. The example we use is quite easy, but imagine that you have a lot of tasks with many different inputs. Without being able to look at the generated code, debugging your DAGs may become really hard.
- DAGs in the folder dags/ are parsed every min_file_process_interval. By default, the value is set to 30 seconds. That means, every 30 seconds your DAGs are generated. If you have a lot of DAGs to create, that may lead to serious performance issues. By the way, increasing the value means changes made on your DAGs will take more time to be reflected.
The multiple-file method
This is a more recommended method as we have already seen the drawback with the previous method which is solved using this method . In this method we have 3 things which are as follows :-
- a template file (the DAG skeleton)
- a input file (where your inputs are)
- a script file, in charge of generating your DAGs by merging the inputs with the template
Advantages
There are many advantage of using this method. Some of them are as follows:-
- It is less prone to errors. Typically, the script is part of a CI/CD pipeline.
- It is scalable. Your DAGs generate once, not every 30 seconds.
- We have full access to the generated code. Easier to debug.
- In short, it is a more reliable method
Airflow Dynamic DAGs with JSON files
Maybe one of the most common way of using this method is with JSON inputs/files. Let’s see how.
- The first step is to create the template file. The DAG from which you will derive others by adding the inputs. Notice that you should put this file outside of the folder dags/
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(f"get_price_DAG_ID_HOLDER", start_date=datetime(2022,1, 1),
schedule_interval="SCHEDULE_INTERVAL_HOLDER", catchup=False) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def send_email(symbol):
print(symbol)
return symbol
send_email(process(extract(INPUT_HOLDER)))
As you can see, it’s a pretty simple DAG with placeholders such as DAG_ID_HOLDER, INPUT_HOLDER or SCHEDULE_INTERVAL_HOLDER. Those placeholders will be replaced by the corresponding values in the JSON files.
2. The second step is to create the JSON files.
config_dag_appl.json
{
"dag_id": "APPL",
"schedule_interval": "@daily",
"input": "126"
}
config_dag_fb.json
{
"dag_id": "FB",
"schedule_interval": "@daily",
"input": "198"
}
config_dag_googl.json
{
"dag_id": "GOOGL",
"schedule_interval": "@daily",
"input": "3243"
}
Basically, for each DAG you want to generate, there is an associated JSON file.
The third and last step is to create the script that will replace the placholders in the template by the values in the config files and generate the DAGs.
import json
import os
import shutil
import fileinput
TEMPLATE_FILE = 'include/dynamic_dag/template.py'
for filename in os.listdir('include/dynamic_dag/'):
if filename.endswith('.json'):
config = json.load(open(f"include/dynamic_dag/{filename}"))
new_dagfile = f"dags/get_price_{config['dag_id']}.py"
shutil.copyfile(TEMPLATE_FILE, new_dagfile)
for line in fileinput.input(new_dagfile, inplace=True):
line = line.replace("DAG_ID_HOLDER", config['dag_id'])
line = line.replace("SCHEDULE_INTERVAL_HOLDER", config['schedule_interval'])
line = line.replace("INPUT_HOLDER", config['input'])
print(line, end="")
All right. Everything is ready, time to test!
Run the script with the command
python include/dynamic_dag/generate_dag.py
and you should obtain three new DAG files as shown below:



Conclusion
In this blog, we learned how to create dynamic DAGs in two different ways. The single-file method and the multiple-files method. I personally recommend two multiple-files methods if you run Airflow in production. There are really the most reliable and scalable ways. All right, that’s it for now!
For more tech blogs, please visit Knoldus Blogs.
1 thought on “Dynamic DAGs in Apache Airflow6 min read”
Comments are closed.