Concepts — Airflow Documentation (2024)

The Airflow platform is a tool for describing, executing, and monitoringworkflows.

Core Ideas

DAGs

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of allthe tasks you want to run, organized in a way that reflects their relationshipsand dependencies.

A DAG is defined in a Python script, which represents the DAGs structure (tasksand their dependencies) as code.

For example, a simple DAG could consist of three tasks: A, B, and C. It couldsay that A has to run successfully before B can run, but C can run anytime. Itcould say that task A times out after 5 minutes, and B can be restarted up to 5times in case it fails. It might also say that the workflow will run every nightat 10pm, but shouldn’t start until a certain date.

In this way, a DAG describes how you want to carry out your workflow; butnotice that we haven’t said anything about what we actually want to do! A, B,and C could be anything. Maybe A prepares data for B to analyze while C sends anemail. Or perhaps A monitors your location so B can open your garage door whileC turns on your house lights. The important thing is that the DAG isn’tconcerned with what its constituent tasks do; its job is to make sure thatwhatever they do happens at the right time, or in the right order, or with theright handling of any unexpected issues.

DAGs are defined in standard Python files that are placed in Airflow’sDAG_FOLDER. Airflow will execute the code in each file to dynamically buildthe DAG objects. You can have as many DAGs as you want, each describing anarbitrary number of tasks. In general, each one should correspond to a singlelogical workflow.

Note

When searching for DAGs, Airflow only considers python filesthat contain the strings “airflow” and “DAG” by default. To considerall python files instead, disable the DAG_DISCOVERY_SAFE_MODEconfiguration flag.

Scope

Airflow will load any DAG object it can import from a DAGfile. Critically,that means the DAG must appear in globals(). Consider the following twoDAGs. Only dag_1 will be loaded; the other one only appears in a localscope.

dag_1 = DAG('this_dag_will_be_discovered')def my_function(): dag_2 = DAG('but_this_dag_will_not')my_function()

Sometimes this can be put to good use. For example, a common pattern withSubDagOperator is to define the subdag inside a function so that Airflowdoesn’t try to load it as a standalone DAG.

Default Arguments

If a dictionary of default_args is passed to a DAG, it will apply them toany of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.

default_args = { 'start_date': datetime(2016, 1, 1), 'owner': 'airflow'}dag = DAG('my_dag', default_args=default_args)op = DummyOperator(task_id='dummy', dag=dag)print(op.owner) # Airflow

Context Manager

Added in Airflow 1.8

DAGs can be used as context managers to automatically assign new operators to that DAG.

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: op = DummyOperator('op')op.dag is dag # True

DAG Runs

A DAG run is a physical instance of a DAG, containing task instances that run for a specific execution_date.

A DAG run is usually created by the Airflow scheduler, but can also be created by an external trigger.Multiple DAG runs may be running at once for a particular DAG, each of them having a different execution_date.For example, we might currently have two DAG runs that are in progress for 2016-01-01 and 2016-01-02 respectively.

execution_date

The execution_date is the logical date and time which the DAG Run, and its task instances, are running for.

This allows task instances to process data for the desired logical date & time.While a task_instance or DAG run might have a physical start date of now,their logical date might be 3 months ago because we are busy reloading something.

In the prior example the execution_date was 2016-01-01 for the first DAG Run and 2016-01-02 for the second.

A DAG run and all task instances created within it are instanced with the same execution_date, sothat logically you can think of a DAG run as simulating the DAG running all of its tasks at someprevious date & time specified by the execution_date.

Tasks

A Task defines a unit of work within a DAG; it is represented as a node in the DAG graph, and it is written in Python.

Each task is an implementation of an Operator, for example a PythonOperator to execute some Python code,or a BashOperator to run a Bash command.

The task implements an operator by defining specific values for that operator,such as a Python callable in the case of PythonOperator or a Bash command in the case of BashOperator.

Relations between Tasks

Consider the following DAG with two tasks.Each task is a node in our DAG, and there is a dependency from task_1 to task_2:

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: task_1 = DummyOperator('task_1') task_2 = DummyOperator('task_2') task_1 >> task_2 # Define dependencies

We can say that task_1 is upstream of task_2, and conversely task_2 is downstream of task_1.When a DAG Run is created, task_1 will start running and task_2 waits for task_1 to complete successfully before it may start.

Task Instances

A task instance represents a specific run of a task and is characterized as thecombination of a DAG, a task, and a point in time (execution_date). Task instancesalso have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “upfor retry”, etc.

Tasks are defined in DAGs, and both are written in Python code to define what you want to do.Task Instances belong to DAG Runs, have an associated execution_date, and are instantiated, runnable entities.

Relations between Task Instances

Again consider the following tasks, defined for some DAG:

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: task_1 = DummyOperator('task_1') task_2 = DummyOperator('task_2') task_1 >> task_2 # Define dependencies

When we enable this DAG, the scheduler creates several DAG Runs - one with execution_date of 2016-01-01,one with execution_date of 2016-01-02, and so on up to the current date.

Each DAG Run will contain a task_1 Task Instance and a task_2 Task instance. Both Task Instances willhave execution_date equal to the DAG Run’s execution_date, and each task_2 will be downstream of(depends on) its task_1.

We can also say that task_1 for 2016-01-01 is the previous task instance of the task_1 for 2016-01-02.Or that the DAG Run for 2016-01-01 is the previous DAG Run to the DAG Run of 2016-01-02.Here, previous refers to the logical past/prior execution_date, that runs independently of other runs,and upstream refers to a dependency within the same run and having the same execution_date.

Note

The Airflow documentation sometimes refers to previous instead of upstream in places, and vice-versa.If you find any occurrences of this, please help us improve by contributing some corrections!

Task Lifecycle

A task goes through various stages from start to completion. In the Airflow UI(graph and tree views), these stages are displayed by a color representing eachstage:

Concepts — Airflow Documentation (1)

The complete lifecycle of the task looks like this:

Concepts — Airflow Documentation (2)

The happy flow consists of the following stages:

  1. No status (scheduler created empty task instance)

  2. Scheduled (scheduler determined task instance needs to run)

  3. Queued (scheduler sent task to executor to run on the queue)

  4. Running (worker picked up a task and is now running it)

  5. Success (task completed)

There is also visual difference between scheduled and manually triggeredDAGs/tasks:

Concepts — Airflow Documentation (3)

The DAGs/tasks with a black border are scheduled runs, whereas the non-borderedDAGs/tasks are manually triggered, i.e. by airflow trigger_dag.

Operators

While DAGs describe how to run a workflow, Operators determine whatactually gets done by a task.

An operator describes a single task in a workflow. Operators are usually (butnot always) atomic, meaning they can stand on their own and don’t need to shareresources with any other operators. The DAG will make sure that operators run inthe correct order; other than those dependencies, operators generallyrun independently. In fact, they may run on two completely different machines.

This is a subtle but very important point: in general, if two operators need toshare information, like a filename or small amount of data, you should considercombining them into a single operator. If it absolutely can’t be avoided,Airflow does have a feature for operator cross-communication called XCom that isdescribed in the section XComs

Airflow provides operators for many common tasks, including:

  • BashOperator - executes a bash command

  • PythonOperator - calls an arbitrary Python function

  • EmailOperator - sends an email

  • SimpleHttpOperator - sends an HTTP request

  • MySqlOperator,SqliteOperator,PostgresOperator,MsSqlOperator,OracleOperator,JdbcOperator, etc. - executes a SQL command

  • Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…

In addition to these basic building blocks, there are many more specificoperators: DockerOperator,HiveOperator, S3FileTransformOperator,PrestoToMySqlTransfer,SlackAPIOperator… you get the idea!

Operators are only loaded by Airflow if they are assigned to a DAG.

See also

  • List Airflow operators

  • How-to guides for some Airflow operators.

DAG Assignment

Added in Airflow 1.8

Operators do not have to be assigned to DAGs immediately (previously dag wasa required argument). However, once an operator is assigned to a DAG, it can notbe transferred or unassigned. DAG assignment can be done explicitly when theoperator is created, through deferred assignment, or even inferred from otheroperators.

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))# sets the DAG explicitlyexplicit_op = DummyOperator(task_id='op1', dag=dag)# deferred DAG assignmentdeferred_op = DummyOperator(task_id='op2')deferred_op.dag = dag# inferred DAG assignment (linked operators must be in the same DAG)inferred_op = DummyOperator(task_id='op3')inferred_op.set_upstream(deferred_op)

Bitshift Composition

Added in Airflow 1.8

We recommend you setting operator relationships with bitshift operators rather than set_upstream()and set_downstream().

Traditionally, operator relationships are set with the set_upstream() andset_downstream() methods. In Airflow 1.8, this can be done with the Pythonbitshift operators >> and <<. The following four statements are allfunctionally equivalent:

op1 >> op2op1.set_downstream(op2)op2 << op1op2.set_upstream(op1)

When using the bitshift to compose operators, the relationship is set in thedirection that the bitshift operator points. For example, op1 >> op2 meansthat op1 runs first and op2 runs second. Multiple operators can becomposed – keep in mind the chain is executed left-to-right and the rightmostobject is always returned. For example:

is equivalent to:

op1.set_downstream(op2)op2.set_downstream(op3)op3.set_upstream(op4)

For convenience, the bitshift operators can also be used with DAGs. For example:

dag >> op1 >> op2

is equivalent to:

op1.dag = dagop1.set_downstream(op2)

We can put this all together to build a simple pipeline:

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: ( DummyOperator(task_id='dummy_1') >> BashOperator( task_id='bash_1', bash_command='echo "HELLO!"') >> PythonOperator( task_id='python_1', python_callable=lambda: print("GOODBYE!")) )

Bitshift can also be used with lists. For example:

op1 >> [op2, op3] >> op4

is equivalent to:

op1 >> op2 >> op4op1 >> op3 >> op4

and equivalent to:

op1.set_downstream([op2, op3])

Relationship Builders

Moved in Airflow 2.0

In Airflow 2.0 those two methods moved from airflow.utils.helpers to airflow.models.baseoperator.

chain and cross_downstream function provide easier ways to set relationshipsbetween operators in specific situation.

When setting a relationship between two lists,if we want all operators in one list to be upstream to all operators in the other,we cannot use a single bitshift composition. Instead we have to split one of the lists:

[op1, op2, op3] >> op4[op1, op2, op3] >> op5[op1, op2, op3] >> op6

cross_downstream could handle list relationships easier.

cross_downstream([op1, op2, op3], [op4, op5, op6])

When setting single direction relationships to many operators, we couldconcat them with bitshift composition.

op1 >> op2 >> op3 >> op4 >> op5

This can be accomplished using chain

chain(op1, op2, op3, op4, op5)

even without operator’s name

chain([DummyOperator(task_id='op' + i, dag=dag) for i in range(1, 6)])

chain can handle a list of operators

chain(op1, [op2, op3], op4)

is equivalent to:

op1 >> [op2, op3] >> op4

When chain sets relationships between two lists of operators, they must have the same size.

chain(op1, [op2, op3], [op4, op5], op6)

is equivalent to:

op1 >> [op2, op3]op2 >> op4op3 >> op5[op4, op5] >> op6

Workflows

You’re now familiar with the core building blocks of Airflow.Some of the concepts may sound very similar, but the vocabulary canbe conceptualized like this:

  • DAG: The work (tasks), and the order in whichwork should take place (dependencies), written in Python.

  • DAG Run: An instance of a DAG for a particular logical date and time.

  • Operator: A class that acts as a template for carrying out some work.

  • Task: Defines work by implementing an operator, written in Python.

  • Task Instance: An instance of a task - that has been assigned to a DAG and has astate associated with a specific DAG run (i.e for a specific execution_date).

  • execution_date: The logical date and time for a DAG Run and its Task Instances.

By combining DAGs and Operators to create TaskInstances, you canbuild complex workflows.

Additional Functionality

In addition to the core Airflow objects, there are a number of more complexfeatures that enable behaviors like limiting simultaneous access to resources,cross-communication, conditional execution, and more.

Hooks

Hooks are interfaces to external platforms and databases like Hive, S3,MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface whenpossible, and act as a building block for operators. They also usethe airflow.models.connection.Connection model to retrieve hostnamesand authentication information. Hooks keep authentication code andinformation out of pipelines, centralized in the metadata database.

Hooks are also very useful on their own to use in Python scripts,Airflow airflow.operators.PythonOperator, and in interactive environmentslike iPython or Jupyter Notebook.

See also

List Airflow hooks

Pools

Some systems can get overwhelmed when too many processes hit them at the sametime. Airflow pools can be used to limit the execution parallelism onarbitrary sets of tasks. The list of pools is managed in the UI(Menu -> Admin -> Pools) by giving the pools a name and assigningit a number of worker slots. Tasks can then be associated withone of the existing pools by using the pool parameter whencreating tasks (i.e., instantiating operators).

aggregate_db_message_job = BashOperator( task_id='aggregate_db_message_job', execution_timeout=timedelta(hours=3), pool='ep_data_pipeline_db_msg_agg', bash_command=aggregate_db_message_job_cmd, dag=dag)aggregate_db_message_job.set_upstream(wait_for_empty_queue)

The pool parameter canbe used in conjunction with priority_weight to define prioritiesin the queue, and which tasks get executed first as slots open up in thepool. The default priority_weight is 1, and can be bumped to anynumber. When sorting the queue to evaluate which task should be executednext, we use the priority_weight, summed up with all of thepriority_weight values from tasks downstream from this task. You canuse this to bump a specific important task and the whole path to that taskgets prioritized accordingly.

Tasks will be scheduled as usual while the slots fill up. Once capacity isreached, runnable tasks get queued and their state will show as such in theUI. As slots free up, queued tasks start running based on thepriority_weight (of the task and its descendants).

Pools are not thread-safe , in case of more than one scheduler in localExecutor Modeyou can’t ensure the non-scheduling of task even if the pool is full.

Note that if tasks are not given a pool, they are assigned to a defaultpool default_pool. default_pool is initialized with 128 slots andcan changed through the UI or CLI (though it cannot be removed).

To combine Pools with SubDAGs see the SubDAGs section.

Connections

The information needed to connect to external systems is stored in the Airflow metastore database and can bemanaged in the UI (Menu -> Admin -> Connections). A conn_id is defined there, and hostname / login /password / schema information attached to it. Airflow pipelines retrieve centrally-managed connectionsinformation by specifying the relevant conn_id.

You may add more than one connection with the same conn_id. When there is more than one connectionwith the same conn_id, the get_connection() method onBaseHook will choose one connection randomly. This can be be used toprovide basic load balancing and fault tolerance, when used in conjunction with retries.

Airflow also provides a mechanism to store connections outside the database, e.g. in environment variables.Additional sources may be enabled, e.g. AWS SSM Parameter Store, or you mayroll your own secrets backend.

Many hooks have a default conn_id, where operators using that hook do notneed to supply an explicit connection ID. For example, the defaultconn_id for the PostgresHook ispostgres_default.

See Managing Connections for details on creating and managing connections.

Queues

When using the CeleryExecutor, the Celery queues that tasks are sent tocan be specified. queue is an attribute of BaseOperator, so anytask can be assigned to any queue. The default queue for the environmentis defined in the airflow.cfg’s celery -> default_queue. This definesthe queue that tasks get assigned to when not specified, as well as whichqueue Airflow workers listen to when started.

Workers can listen to one or multiple queues of tasks. When a worker isstarted (using the command airflow worker), a set of comma-delimitedqueue names can be specified (e.g. airflow worker -q spark). This workerwill then only pick up tasks wired to the specified queue(s).

This can be useful if you need specialized workers, either from aresource perspective (for say very lightweight tasks where one workercould take thousands of tasks without a problem), or from an environmentperspective (you want a worker running from within the Spark clusteritself because it needs a very specific environment and security rights).

XComs

XComs let tasks exchange messages, allowing more nuanced forms of control andshared state. The name is an abbreviation of “cross-communication”. XComs areprincipally defined by a key, value, and timestamp, but also track attributeslike the task/DAG that created the XCom and when it should become visible. Anyobject that can be pickled can be used as an XCom value, so users should makesure to use objects of appropriate size.

XComs can be “pushed” (sent) or “pulled” (received). When a task pushes anXCom, it makes it generally available to other tasks. Tasks can push XComs atany time by calling the xcom_push() method. In addition, if a task returnsa value (either from its Operator’s execute() method, or from aPythonOperator’s python_callable function), then an XCom containing thatvalue is automatically pushed.

Tasks call xcom_pull() to retrieve XComs, optionally applying filtersbased on criteria like key, source task_ids, and source dag_id. Bydefault, xcom_pull() filters for the keys that are automatically given toXComs when they are pushed by being returned from execute functions (asopposed to XComs that are pushed manually).

If xcom_pull is passed a single string for task_ids, then the mostrecent XCom value from that task is returned; if a list of task_ids ispassed, then a corresponding list of XCom values is returned.

# inside a PythonOperator called 'pushing_task'def push_function(): return value# inside another PythonOperator where provide_context=Truedef pull_function(**context): value = context['task_instance'].xcom_pull(task_ids='pushing_task')

It is also possible to pull XCom directly in a template, here’s an exampleof what this may look like:

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

Note that XComs are similar to Variables, but are specifically designedfor inter-task communication rather than global settings.

Custom XCom backend

It is possible to change XCom behaviour os serialization and deserialization of tasks’ result.To do this one have to change xcom_backend parameter in Airflow config. Provided value should pointto a class that is subclass of BaseXCom. To alter the serialaization /deserialization mechanism the custom class should override serialize_value and deserialize_valuemethods.

Variables

Variables are a generic way to store and retrieve arbitrary content orsettings as a simple key value store within Airflow. Variables can belisted, created, updated and deleted from the UI (Admin -> Variables),code or CLI. In addition, json settings files can be bulk uploaded throughthe UI. While your pipeline code definition and most of your constantsand variables should be defined in code and stored in source control,it can be useful to have some variables or configuration itemsaccessible and modifiable through the UI.

from airflow.models import Variablefoo = Variable.get("foo")bar = Variable.get("bar", deserialize_json=True)baz = Variable.get("baz", default_var=None)

The second call assumes json content and will be deserialized intobar. Note that Variable is a sqlalchemy model and can be usedas such. The third call uses the default_var parameter with the valueNone, which either returns an existing value or None if the variableisn’t defined. The get function will throw a KeyError if the variabledoesn’t exist and no default is provided.

You can use a variable from a jinja template with the syntax :

echo {{ var.value.<variable_name> }}

or if you need to deserialize a json object from the variable :

echo {{ var.json.<variable_name> }}

Storing Variables in Environment Variables

New in version 1.10.10.

Airflow Variables can also be created and managed using Environment Variables. The environment variablenaming convention is AIRFLOW_VAR_<variable_name>, all uppercase.So if your variable key is FOO then the variable name should be AIRFLOW_VAR_FOO.

For example,

export AIRFLOW_VAR_FOO=BAR# To use JSON, store them as JSON stringsexport AIRFLOW_VAR_FOO_BAZ='{"hello":"world"}'

You can use them in your DAGs as:

from airflow.models import Variablefoo = Variable.get("foo")foo_json = Variable.get("foo_baz", deserialize_json=True)

Note

Single underscores surround VAR. This is in contrast with the way airflow.cfgparameters are stored, where double underscores surround the config section name.Variables set using Environment Variables would not appear in the Airflow UI but you willbe able to use it in your DAG file.

Branching

Sometimes you need a workflow to branch, or only go down a certain pathbased on an arbitrary condition which is typically related to somethingthat happened in an upstream task. One way to do this is by using theBranchPythonOperator.

The BranchPythonOperator is much like the PythonOperator except that itexpects a python_callable that returns a task_id (or list of task_ids). Thetask_id returned is followed, and all of the other paths are skipped.The task_id returned by the Python function has to reference a taskdirectly downstream from the BranchPythonOperator task.

Note that using tasks with depends_on_past=True downstream fromBranchPythonOperator is logically unsound as skipped statuswill invariably lead to block tasks that depend on their past successes.skipped states propagates where all directly upstream tasks areskipped.

Note that when a path is a downstream task of the returned task (list), it willnot be skipped:

Concepts — Airflow Documentation (4)

Paths of the branching task are branch_a, join and branch_b. Sincejoin is a downstream task of branch_a, it will be excluded from the skippedtasks when branch_a is returned by the Python callable.

The BranchPythonOperator can also be used with XComs allowing branchingcontext to dynamically decide what branch to follow based on upstream tasks.For example:

def branch_func(**kwargs): ti = kwargs['ti'] xcom_value = int(ti.xcom_pull(task_ids='start_task')) if xcom_value >= 5: return 'continue_task' else: return 'stop_task'start_op = BashOperator( task_id='start_task', bash_command="echo 5", xcom_push=True, dag=dag)branch_op = BranchPythonOperator( task_id='branch_task', provide_context=True, python_callable=branch_func, dag=dag)continue_op = DummyOperator(task_id='continue_task', dag=dag)stop_op = DummyOperator(task_id='stop_task', dag=dag)start_op >> branch_op >> [continue_op, stop_op]

If you wish to implement your own operators with branching functionality, youcan inherit from BaseBranchOperator,which behaves similarly to BranchPythonOperator but expects you to providean implementation of the method choose_branch. As with the callable forBranchPythonOperator, this method should return the ID of a downstream task,or a list of task IDs, which will be run, and all others will be skipped.

class MyBranchOperator(BaseBranchOperator): def choose_branch(self, context): """ Run an extra branch on the first day of the month """ if context['execution_date'].day == 1: return ['daily_task_id', 'monthly_task_id'] else: return 'daily_task_id'

SubDAGs

SubDAGs are perfect for repeating patterns. Defining a function that returns aDAG object is a nice design pattern when using Airflow.

Airbnb uses the stage-check-exchange pattern when loading data. Data is stagedin a temporary table, after which data quality checks are performed againstthat table. Once the checks all pass the partition is moved into the productiontable.

As another example, consider the following DAG:

Concepts — Airflow Documentation (5)

We can combine all of the parallel task-* operators into a single SubDAG,so that the resulting DAG resembles the following:

Concepts — Airflow Documentation (6)

Note that SubDAG operators should contain a factory method that returns a DAGobject. This will prevent the SubDAG from being treated like a separate DAG inthe main UI. For example:

airflow/example_dags/subdags/subdag.pyView Source

from airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperatordef subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id='%s.%s' % (parent_dag_name, child_dag_name), default_args=args, schedule_interval="@daily", ) for i in range(5): DummyOperator( task_id='%s-task-%s' % (child_dag_name, i + 1), default_args=args, dag=dag_subdag, ) return dag_subdag

This SubDAG can then be referenced in your main DAG file:

airflow/example_dags/example_subdag_operator.pyView Source

from airflow.example_dags.subdags.subdag import subdagfrom airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.subdag_operator import SubDagOperatorfrom airflow.utils.dates import days_agoDAG_NAME = 'example_subdag_operator'args = { 'owner': 'Airflow', 'start_date': days_ago(2),}dag = DAG( dag_id=DAG_NAME, default_args=args, schedule_interval="@once", tags=['example'])start = DummyOperator( task_id='start', dag=dag,)section_1 = SubDagOperator( task_id='section-1', subdag=subdag(DAG_NAME, 'section-1', args), dag=dag,)some_other_task = DummyOperator( task_id='some-other-task', dag=dag,)section_2 = SubDagOperator( task_id='section-2', subdag=subdag(DAG_NAME, 'section-2', args), dag=dag,)end = DummyOperator( task_id='end', dag=dag,)start >> section_1 >> some_other_task >> section_2 >> end

You can zoom into a SubDagOperator from the graph view of the main DAG to showthe tasks contained within the SubDAG:

Concepts — Airflow Documentation (7)

Some other tips when using SubDAGs:

  • by convention, a SubDAG’s dag_id should be prefixed by its parent anda dot. As in parent.child

  • share arguments between the main DAG and the SubDAG by passing arguments tothe SubDAG operator (as demonstrated above)

  • SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule isset to None or @once, the SubDAG will succeed without having doneanything

  • clearing a SubDagOperator also clears the state of the tasks within

  • marking success on a SubDagOperator does not affect the state of the taskswithin

  • refrain from using depends_on_past=True in tasks within the SubDAG asthis can be confusing

  • it is possible to specify an executor for the SubDAG. It is common to usethe SequentialExecutor if you want to run the SubDAG in-process andeffectively limit its parallelism to one. Using LocalExecutor can beproblematic as it may over-subscribe your worker, running multiple tasks ina single slot

See airflow/example_dags for a demonstration.

Note that airflow pool is not honored by SubDagOperator. Hence resources could beconsumed by SubdagOperators.

SLAs

Service Level Agreements, or time by which a task or DAG should havesucceeded, can be set at a task level as a timedelta. Ifone or many instances have not succeeded by that time, an alert email is sentdetailing the list of tasks that missed their SLA. The event is also recordedin the database and made available in the web UI under Browse->SLA Misseswhere events can be analyzed and documented.

SLAs can be configured for scheduled tasks by using the sla parameter.In addition to sending alerts to the addresses specified in a task’s email parameter,the sla_miss_callback specifies an additional Callableobject to be invoked when the SLA is not met.

If you don’t want to check SLAs, you can disable globally (all the DAGs) bysetting check_slas=False under [core] section in airflow.cfg file:

[core]check_slas = False

Note

For information on the email configuration, see Email Configuration

Trigger Rules

Though the normal workflow behavior is to trigger tasks when all theirdirectly upstream tasks have succeeded, Airflow allows for more complexdependency settings.

All operators have a trigger_rule argument which defines the rule by whichthe generated task get triggered. The default value for trigger_rule isall_success and can be defined as “trigger this task when all directlyupstream tasks have succeeded”. All other rules described here are basedon direct parent tasks and are values that can be passed to any operatorwhile creating tasks:

  • all_success: (default) all parents have succeeded

  • all_failed: all parents are in a failed or upstream_failed state

  • all_done: all parents are done with their execution

  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done

  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done

  • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped

  • none_failed_or_skipped: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.

  • none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state

  • dummy: dependencies are just for show, trigger at will

Note that these can be used in conjunction with depends_on_past (boolean)that, when set to True, keeps a task from getting triggered if theprevious schedule for the task hasn’t succeeded.

One must be aware of the interaction between trigger rules and skipped tasksin schedule level. Skipped tasks will cascade through trigger rulesall_success and all_failed but not all_done, one_failed, one_success,none_failed, none_failed_or_skipped, none_skipped and dummy.

For example, consider the following DAG:

#dags/branch_without_trigger.pyimport datetime as dtfrom airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.python_operator import BranchPythonOperatordag = DAG( dag_id='branch_without_trigger', schedule_interval='@once', start_date=dt.datetime(2019, 2, 28))run_this_first = DummyOperator(task_id='run_this_first', dag=dag)branching = BranchPythonOperator( task_id='branching', dag=dag, python_callable=lambda: 'branch_a')branch_a = DummyOperator(task_id='branch_a', dag=dag)follow_branch_a = DummyOperator(task_id='follow_branch_a', dag=dag)branch_false = DummyOperator(task_id='branch_false', dag=dag)join = DummyOperator(task_id='join', dag=dag)run_this_first >> branchingbranching >> branch_a >> follow_branch_a >> joinbranching >> branch_false >> join

In the case of this DAG, join is downstream of follow_branch_aand branch_false. The join task will show up as skippedbecause its trigger_rule is set to all_success by default andskipped tasks will cascade through all_success.

Concepts — Airflow Documentation (8)

By setting trigger_rule to none_failed_or_skipped in join task,

#dags/branch_with_trigger.py...join = DummyOperator(task_id='join', dag=dag, trigger_rule='none_failed_or_skipped')...

The join task will be triggered as soon asbranch_false has been skipped (a valid completion state) andfollow_branch_a has succeeded. Because skipped tasks will notcascade through none_failed_or_skipped.

Concepts — Airflow Documentation (9)

Latest Run Only

Standard workflow behavior involves running a series of tasks for aparticular date/time range. Some workflows, however, perform tasks thatare independent of run time but need to be run on a schedule, much like astandard cron job. In these cases, backfills or running jobs missed duringa pause just wastes CPU cycles.

For situations like this, you can use the LatestOnlyOperator to skiptasks that are not being run during the most recent scheduled run for aDAG. The LatestOnlyOperator skips all downstream tasks, if the timeright now is not between its execution_time and the next scheduledexecution_time.

For example, consider the following DAG:

airflow/example_dags/example_latest_only_with_trigger.pyView Source

import datetime as dtfrom airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.latest_only_operator import LatestOnlyOperatorfrom airflow.utils.dates import days_agofrom airflow.utils.trigger_rule import TriggerRuledag = DAG( dag_id='latest_only_with_trigger', schedule_interval=dt.timedelta(hours=4), start_date=days_ago(2), tags=['example'])latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)task1 = DummyOperator(task_id='task1', dag=dag)task2 = DummyOperator(task_id='task2', dag=dag)task3 = DummyOperator(task_id='task3', dag=dag)task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE)latest_only >> task1 >> [task3, task4]task2 >> [task3, task4]

In the case of this DAG, the latest_only task will show up as skippedfor all runs except the latest run. task1 is directly downstream oflatest_only and will also skip for all runs except the latest.task2 is entirely independent of latest_only and will run in allscheduled periods. task3 is downstream of task1 and task2 andbecause of the default trigger_rule being all_success will receivea cascaded skip from task1. task4 is downstream of task1 andtask2. It will be first skipped directly by LatestOnlyOperator,even its trigger_rule is set to all_done.

Concepts — Airflow Documentation (10)

Zombies & Undeads

Task instances die all the time, usually as part of their normal life cycle,but sometimes unexpectedly.

Zombie tasks are characterized by the absenceof an heartbeat (emitted by the job periodically) and a running statusin the database. They can occur when a worker node can’t reach the database,when Airflow processes are killed externally, or when a node gets rebootedfor instance. Zombie killing is performed periodically by the scheduler’sprocess.

Undead processes are characterized by the existence of a process and a matchingheartbeat, but Airflow isn’t aware of this task as running in the database.This mismatch typically occurs as the state of the database is altered,most likely by deleting rows in the “Task Instances” view in the UI.Tasks are instructed to verify their state as part of the heartbeat routine,and terminate themselves upon figuring out that they are in this “undead”state.

Cluster Policy

Cluster policies provide an interface for taking action on every Airflow taskeither at DAG load time or just before task execution.

Cluster Policies for Task Mutation

In case you want to apply cluster-wide mutations to the Airflow tasks,you can either mutate the task right after the DAG is loaded ormutate the task instance before task execution.

Mutate tasks after DAG loaded

To mutate the task right after the DAG is parsed, you can definea policy function in airflow_local_settings.py that mutates thetask based on other task or DAG attributes (through task.dag).It receives a single argument as a reference to the task object and you can alterits attributes.

For example, this function could apply a specific queue property whenusing a specific operator, or enforce a task timeout policy, making surethat no tasks run for more than 48 hours. Here’s an example of what thismay look like inside your airflow_local_settings.py:

def policy(task): if task.__class__.__name__ == 'HivePartitionSensor': task.queue = "sensor_queue" if task.timeout > timedelta(hours=48): task.timeout = timedelta(hours=48)

Please note, cluster policy will have precedence over taskattributes defined in DAG meaning if task.sla is definedin dag and also mutated via cluster policy then later will have precedence.

Mutate task instances before task execution

To mutate the task instance before the task execution, you can define atask_instance_mutation_hook function in airflow_local_settings.pythat mutates the task instance.

For example, this function re-routes the task to execute in a differentqueue during retries:

def task_instance_mutation_hook(ti): if ti.try_number >= 1: ti.queue = 'retry_queue'

Cluster Policies for Custom Task Checks

You may also use Cluster Policies to apply cluster-wide checks on Airflowtasks. You can raise AirflowClusterPolicyViolationin a policy or task mutation hook (described below) to prevent a DAG from beingimported or prevent a task from being executed if the task is not compliant withyour check.

These checks are intended to help teams using Airflow to protect against commonbeginner errors that may get past a code reviewer, rather than as technicalsecurity controls.

For example, don’t run tasks without airflow owners:

def task_must_have_owners(task): if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'): raise AirflowClusterPolicyViolation( 'Task must have non-None non-default owner. Current value: {}'.format(task.owner))

If you have multiple checks to apply, it is best practice to curate these rulesin a separate python module and have a single policy / task mutation hook thatperforms multiple of these custom checks and aggregates the various errormessages so that a single AirflowClusterPolicyViolation can be reported inthe UI (and import errors table in the database).

For Example in airflow_local_settings.py:

TASK_RULES = [task_must_have_owners] # type: List[Callable[[BaseOperator], None]]def _check_task_rules(current_task): """Check task rules for given task.""" notices = [] for rule in TASK_RULES: try: rule(current_task) except AirflowClusterPolicyViolation as ex: notices.append(str(ex)) if notices: notices_list = " * " + "\n * ".join(notices) raise AirflowClusterPolicyViolation( "DAG policy violation (DAG ID: {0}, Path: {1}):\n" "Notices:\n" "{2}".format(current_task.dag_id, current_task.dag.filepath, notices_list))def cluster_policy(task): """Ensure Tasks have non-default owners.""" _check_task_rules(task)

Where to put airflow_local_settings.py?

Add a airflow_local_settings.py file to your $PYTHONPATHor to $AIRFLOW_HOME/config folder.

Documentation & Notes

It’s possible to add documentation or notes to your DAGs & task objects thatbecome visible in the web interface (“Graph View” & “Tree View” for DAGs, “Task Details” fortasks). There are a set of special task attributes that get rendered as richcontent if defined:

attribute

rendered to

doc

monospace

doc_json

json

doc_yaml

yaml

doc_md

markdown

doc_rst

reStructuredText

Please note that for DAGs, doc_md is the only attribute interpreted.

This is especially useful if your tasks are built dynamically fromconfiguration files, it allows you to expose the configuration that ledto the related tasks in Airflow.

"""### My great DAG"""dag = DAG('my_dag', default_args=default_args)dag.doc_md = __doc__t = BashOperator("foo", dag=dag)t.doc_md = """\#Title"Here's a [url](www.airbnb.com)"""

This content will get rendered as markdown respectively in the “Graph View” and“Task Details” pages.

Jinja Templating

Airflow leverages the power ofJinja Templating and this can be apowerful tool to use in combination with macros (see the Macros reference section).

For example, say you want to pass the execution date as an environment variableto a Bash script using the BashOperator.

# The execution date as YYYY-MM-DDdate = "{{ ds }}"t = BashOperator( task_id='test_env', bash_command='/tmp/test.sh ', dag=dag, env={'EXECUTION_DATE': date})

Here, {{ ds }} is a macro, and because the env parameter of theBashOperator is templated with Jinja, the execution date will be availableas an environment variable named EXECUTION_DATE in your Bash script.

You can use Jinja templating with every parameter that is marked as “templated”in the documentation. Template substitution occurs just before the pre_executefunction of your operator is called.

You can also use Jinja templating with nested fields, as long as these nested fieldsare marked as templated in the structure they belong to: fields registered intemplate_fields property will be submitted to template substitution, like thepath field in the example below:

class MyDataReader: template_fields = ['path'] def __init__(self, my_path): self.path = my_path # [additional code here...]t = PythonOperator( task_id='transform_data', python_callable=transform_data op_args=[ MyDataReader('/tmp/{{ ds }}/my_file') ], dag=dag)

Note

template_fields property can equally be a class variable or aninstance variable.

Deep nested fields can also be substituted, as long as all intermediate fields aremarked as template fields:

class MyDataTransformer: template_fields = ['reader'] def __init__(self, my_reader): self.reader = my_reader # [additional code here...]class MyDataReader: template_fields = ['path'] def __init__(self, my_path): self.path = my_path # [additional code here...]t = PythonOperator( task_id='transform_data', python_callable=transform_data op_args=[ MyDataTransformer(MyDataReader('/tmp/{{ ds }}/my_file')) ], dag=dag)

You can pass custom options to the Jinja Environment when creating your DAG.One common usage is to avoid Jinja from dropping a trailing newline from atemplate string:

my_dag = DAG(dag_id='my-dag', jinja_environment_kwargs={ 'keep_trailing_newline': True, # some other jinja2 Environment options here })

See Jinja documentationto find all available options.

Exceptions

Airflow defines a number of exceptions; most of these are used internally, but a feware relevant to authors of custom operators or python callables called from PythonOperatortasks. Normally any exception raised from an execute method or python callable will eithercause a task instance to fail if it is not configured to retry or has reached its limit onretry attempts, or to be marked as “up for retry”. A few exceptions can be used when differentbehavior is desired:

  • AirflowSkipException can be raised to set the state of the current task instance to “skipped”

  • AirflowFailException can be raised to set the state of the current task to “failed” regardlessof whether there are any retry attempts remaining.

This example illustrates some possibilities

from airflow.exceptions import AirflowFailException, AirflowSkipExceptiondef fetch_data(): try: data = get_some_data(get_api_key()) if not data: # Set state to skipped and do not retry # Downstream task behavior will be determined by trigger rules raise AirflowSkipException("No data available.") except Unauthorized: # If we retry, our api key will still be bad, so don't waste time retrying! # Set state to failed and move on raise AirflowFailException("Our api key is bad!") except TransientError: print("Looks like there was a blip.") # Raise the exception and let the task retry unless max attempts were reached raise handle(data)task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, retries=10)

See also

  • List of Airflow exceptions

Packaged DAGs

While often you will specify DAGs in a single .py file it might sometimesbe required to combine a DAG and its dependencies. For example, you might wantto combine several DAGs together to version them together or you might wantto manage them together or you might need an extra module that is not availableby default on the system you are running Airflow on. To allow this you can createa zip file that contains the DAG(s) in the root of the zip file and have the extramodules unpacked in directories.

For instance you can create a zip file that looks like this:

my_dag1.pymy_dag2.pypackage1/__init__.pypackage1/functions.py

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py.It will not go into subdirectories as these are considered to be potentialpackages.

In case you would like to add module dependencies to your DAG you basically woulddo the same, but then it is more suitable to use a virtualenv and pip.

virtualenv zip_dagsource zip_dag/bin/activatemkdir zip_dag_contentscd zip_dag_contentspip install --install-option="--install-lib=$PWD" my_useful_packagecp ~/my_dag.py .zip -r zip_dag.zip *

Note

the zip file will be inserted at the beginning of module search list(sys.path) and as such it will be available to any other code that resideswithin the same interpreter.

Note

packaged dags cannot be used with pickling turned on.

Note

packaged dags cannot contain dynamic libraries (eg. libz.so) these needto be available on the system if a module needs those. In other words onlypure python modules can be packaged.

.airflowignore

A .airflowignore file specifies the directories or files in DAG_FOLDERthat Airflow should intentionally ignore. Each line in .airflowignorespecifies a regular expression pattern, and directories or files whose names(not DAG id) match any of the patterns would be ignored (under the hood,re.findall() is used to match the pattern). Overall it works like a.gitignore file. Use the # character to indicate a comment; allcharacters on a line following a # will be ignored.

.airflowignore file should be put in your DAG_FOLDER.For example, you can prepare a .airflowignore file with contents

project_atenant_[\d]

Then files like project_a_dag_1.py, TESTING_project_a.py, tenant_1.py,project_a/dag_1.py, and tenant_1/dag_1.py in your DAG_FOLDER would be ignored(If a directory’s name matches any of the patterns, this directory and all its subfolderswould not be scanned by Airflow at all. This improves efficiency of DAG finding).

The scope of a .airflowignore file is the directory it is in plus all its subfolders.You can also prepare .airflowignore file for a subfolder in DAG_FOLDER and itwould only be applicable for that subfolder.

Concepts — Airflow Documentation (2024)
Top Articles
Latest Posts
Article information

Author: Kerri Lueilwitz

Last Updated:

Views: 5858

Rating: 4.7 / 5 (47 voted)

Reviews: 86% of readers found this page helpful

Author information

Name: Kerri Lueilwitz

Birthday: 1992-10-31

Address: Suite 878 3699 Chantelle Roads, Colebury, NC 68599

Phone: +6111989609516

Job: Chief Farming Manager

Hobby: Mycology, Stone skipping, Dowsing, Whittling, Taxidermy, Sand art, Roller skating

Introduction: My name is Kerri Lueilwitz, I am a courageous, gentle, quaint, thankful, outstanding, brave, vast person who loves writing and wants to share my knowledge and understanding with you.