Apache Airflow: Understanding Operators

Reading Time: 5 minutes

An Operator is the building block of an Airflow DAG. It determines what will be executed when the DAG runs. They can be considered as templates or blueprints that contain the logic for a predefined task, that we can define declaratively inside an Airflow DAG.

When an operator is instantiated along with its required parameters then it is known as a task. An Operator defines one task in your data pipeline. Operators are generally used to provide integration to some other service like MySQLOperator, JdbcOperator, DockerOperator, etc.

Airflow also provides operators for many common tasks, including:

  • BashOperator – for executing a simple bash command
  • PythonOperator – calls an arbitrary Python function
  • EmailOperator – sends an email
  • SimpleHttpOperator – sends an HTTP request
  • MySqlOperatorSqliteOperatorPostgresOperatorMsSqlOperatorOracleOperatorJdbcOperator, etc. – for executing a SQL command
  • Sensor – waits for a certain time, for a condition to be satisfied.
Operators In Apache Airflow

The airflow/contrib/ directory contains many more operators built by the community. These operators might not be fully baked or well tested as those in the main distribution, but they allow users to more easily add new functionality to the platform.

Properties of Operators:

  1. An operator describes a single task in a workflow. If you have two tasks in the same operator instance consider separating them to two different instance of that same operator.
  2. An operator is usually (but not always) atomic(i.e. they can work alone and do not require to communicate or share resources with any other operators). But they can communicate using a feature of Airflow called XCom(pronounced as Cross Communications).
  3. An operator should be idempotent (i.e. Operator should produce the same result regardless of the number of times it is run).

Types of Operators:

  1. Action Operators: An action operator is an operator executing something (i.e. an action). For example- The PythonOperator executes a python function, the BashOperator executes a bash command, etc.
  2. Transfer Operators: These operators are used to transfer data from source to destination. For example MySQLToGCSOperator.
  3. Sensors: Sensors are used to wait for something to happen, before proceeding to the next task. More specifically it waits for a condition to be met, at a specific interval, before succeding. However, if the condition doesn’t meet, the sensor waits for another interval and then checks again. For example HdfsSensor, FileSensor

Apart from these, you should also know about:

BaseOperator: As the name suggests this operator acts as the base (or parent) for each and every operator in Airflow. All types of operators derive functionalities from the BaseOperator through inheritance. BaseOperator parameters include:

  • task_id (string) – a unique, meaningful id for the task
  • owner (string) – the owner of the task, using the unix username is recommended
  • retries (int) – the number of retries that should be performed before failing the task
  • retry_delay (timedelta) – delay between retries
  • retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
  • max_retry_delay (timedelta) – maximum delay interval between retries
  • start_date (datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval.
  • end_date (datetime) – if specified, the scheduler won’t go beyond this date
  • depends_on_past (bool) – when set to true, task instances will run sequentially while relying on the previous task’s schedule to succeed. The task instance for the start_date is allowed to run.
  • wait_for_downstream (bool) – when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs.
  • queue (str) – which queue to target when running this job. Not all executors implement queue management, the CeleryExecutor does support targeting specific queues.
  • dag (DAG) – a reference to the dag the task is attached to (if any)
  • priority_weight (int) – priority weight of this task against other task. This allows the executor to trigger higher priority tasks before others when things get backed up.
  • weight_rule (str) – weighting method used for the effective total priority weight of the task. Options are: { downstream | upstream | absolute } default is downstream When set to downstream the effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. When set to absolute, the effective weight is the exact priority_weight specified without additional weighting.
  • pool (str) – the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks
  • sla (datetime.timedelta) – time by which the job is expected to succeed.
  • execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
  • on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function.
  • on_retry_callback (callable) – much like the on_failure_callback except that it is executed when retries occur.
  • on_success_callback (callable) – much like the on_failure_callback except that it is executed when the task succeeds.
  • trigger_rule (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule
  • resources (dict) – A map of resource parameter names (the argument names of the Resources constructor) to their values.
  • run_as_user (str) – unix username to impersonate while running the task
  • task_concurrency (int) – When set, a task will be able to limit the concurrent runs across execution_dates
  • executor_config (dict) –Additional task-level configuration parameters that are interpreted by a specific executor. Parameters are namespaced by the name of executor.

Similarly, all the sensors inherit from the BaseSensorOperator and have the following parameters:

  • mode: How the sensor operates. There are two types of modes:
    • poke: This is the default mode. In poke mode, the sensor acquires a worker slot for the entire execution time and sleeps between pokes.
    • reschedule: In reschedule mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time.
  • poke_interval: In poke_interval mode, interval is the time in seconds that the sensor waits before checking the condition again. The default is 30 seconds.
  • exponential_backoff: When set to True, this setting creates exponentially longer wait times between pokes in poke mode.
  • timeout: The maximum amount of time in seconds that the sensor should check the condition for. If the condition has not been met when this time is reached, the task fails.
  • soft_fail: If set to True, the task is marked as skipped if the condition is not met by the timeout.

Examples of Operators:

Apache Airflow Bash Operator – Executes a bash command.

BashOperator in Apache Airflow provides a simple method to run bash commands in your workflow. This is the operator you’ll want to use to specify the job if your DAG performs a bash command or script.

t1 = BashOperator(
        task_id=t1,
        dag=dag,
        bash_command='echo "Hello"'
        )

Apache Airflow Python Operator – Calls a python function.

The Airflow PythonOperator provides a basic yet effective operator that lets you run a Python callable function from your DAG.

def print_string():
    print("Hello")

t2 = PythonOperator(
        task_id="t3",
        dag=dag,
        python_callable=print_string,
      )

Apache Airflow Email Operator – Sends an email.

EmailOperator is the most straightforward method for sending emails from airflow. With Email Operator, you can send task-related emails or build up an alerting system.

t4= EmailOperator(
       task_id=t4,
       to='test@mail.com',
       subject='Alert Mail',
       html_content=""" Mail Test """,
       dag=dag
)

Apache Airflow SSH Operator

SSHOperator is used to execute commands on a given remote host using the ssh_hook.

t5 = SSHOperator(
        task_id='SSHOperator',
        ssh_conn_id='ssh_connectionid',
        command='echo "Hello SSH Operator"'
    )

Apache Airflow Docker Operator

Docker Operator helps to execute commands inside a docker container. With the help of the airflow docker operator, you can store files in a temporary directory created on the host and mounted into the container.

t6 = DockerOperator(
            task_id='docker_command',
            image='centos:latest',
            api_version='auto',
            auto_remove=True,
            command="/bin/sleep 30",
            docker_url="unix://var/run/docker.sock",
            network_mode="bridge"
)

Apache Airflow HTTP Operator

To perform an activity, a call is made to an endpoint on an HTTP system. This is beneficial if you’re using an API that returns a big JSON payload and you’re only interested in a part of it.

t7 = HttpSensor(
    task_id='t7',
    http_conn_id='http_default',
    endpoint='',
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=4,
    dag=dag,
)

Conclusion

This blog explains the Operators and their types in detail. However, there’s much more to learn about Airflow Operators and various other related concepts. Please follow Apache Airflow Documentation(Operators) for further reference. For more awesome tech blogs please visit Knoldus Blogs.

knoldus

Written by 

Agnibhas Chattopadhyay is a Software Consultant at Knoldus Inc. He is very passionate about Technology and Basketball. Experienced in languages like C, C++, Java, Python and frameworks like Spring/Springboot, Apache Kafka and CI/CD tools like Jenkins, Docker. He loves sharing knowledge by writing easy to understand tech blogs.