In this blog, we will learn how to Send Email Alerts to the user about the Task Status using Apache Airflow.
Prerequisite: Airflow SMTP Configuration
Step 1 Generate Google Application Password
Visit this link and log in with your email Id and password. When you have successfully logged in, you will see the below window:
You have to choose “Mail” as an app and select whichever device you are using from the Select device dropdown. Also, you can give the custom name of your device if you do not find a suitable device in the list.
Click on GENERATE.
NOTE: For your system, a password for the app will be issued. Make a copy of it and keep it secure. Do not share it with anyone.
Step 2 Edit the airflow.cfg file like below:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = Add_your_required_email_address
# Example: smtp_password = airflow
smtp_password = Add_16_digit_generated_password
smtp_port = 587
smtp_mail_from = Add_your_required_email_address
smtp_timeout = 30
smtp_retry_limit = 5
Automate Email Alerts for Task Status [ Code Implementation ]:
Step 1 Create a ‘dags’ folder in the airflow folder. Then, create a .py file to start your coding.
example: airflow>>dags>>taskStatusEmailAlerts.py
Strat Coding.
Step 2 Import all the necessary operators:
- datetime: It allows you to run your DAGs with time zone dependent schedules.
- timedelta: time by which the job is expected to succeed.
- DAG: All of the tasks you wish to complete, grouped in a style that demonstrates their interdependencies and interconnections.
- BashOperator: a powerful yet easy operator that allows you to run a bash script, a command, or a collection of commands from DAGs.
- airflow.utils.email: used to send emails. You can also use EmailOperator for the same.
Step 3 Write two methods i.e One for task failure email alert and other one for task success email alert:
Method for task failure Email Alert:
Method for task success Email Alert:
Step 4 Define DAG:
When you will see the airflow UI, you will see a DAG created with the name “check_task_status_pass_fail”.
Step 5 Define the tasks:
- say_hello this task will send a success email alert in case the bash command is executed successfully, otherwise it will send a failure email alert.
- open_temp_folder this task will send a success email alert in case there is a temp_folder present in your computer, otherwise it will send a failure email alert.
Step 6 Define dependencies:
As in airflow, we define multiple tasks. In order to define which task needs to be executed when, dependencies are defined.
Complete Code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.email import send_email
def failure_function(context):
dag_run = context.get('dag_run')
msg = "The folder you are trying to open doesn't exist hence the task has Failed."
subject = f"DAG {dag_run} Failed"
send_email(to='youremailaddress@gmail.com', subject=subject, html_content=msg)
def success_function(context):
dag_run = context.get('dag_run')
msg = "Echoed Hello hence the task has executed successfully."
subject = f"DAG {dag_run} has completed"
send_email(to='youremailaddress@gmail.com', subject=subject, html_content=msg)
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 3, 21),
}
with DAG('check_task_status_pass_fail',
default_args=default_args,
description='A simple job to nofity user about the execution status of their specified tasks.',
schedule_interval='@daily',
catchup=False
) as dag:
say_hello = BashOperator(
task_id='say_hello',
on_failure_callback=failure_function,
on_success_callback=success_function,
bash_command='echo Hello'
)
open_temp_folder = BashOperator(
task_id='open_temp_folder',
on_failure_callback=failure_function,
on_success_callback=success_function,
bash_command='cd temp_folder'
)
say_hello >> open_temp_folder
Results:
Above we can see, a DAG named as “check_task_status_pass_fail” created. Click on the dag and trigger it.
After triggering the DAG, we get the below graph:
Email Alerts:
Task Success Email Alert:
Task failure Email Alert:
Conclusion
We can use Apache Airflow not only for Automating Emails, but also automating other things as well.
Example: Slack alerts for the same task.
Interested into technology blogs? Please visit Knoldus Blogs to read and dive into different technologies.