DAG writing best practices in Apache Airflow
Because Airflow is 100% code, knowing the basics of Python is all it takes to get started writing DAGs. However, writing DAGs that are efficient, secure, and scalable requires some Airflow-specific finesse. In this guide, you'll learn how you can develop DAGs that make the most of what Airflow has to offer.
In general, best practices fall into one of two categories:
- DAG design
- Using Airflow as an orchestrator
For an in-depth walk through and examples of some of the concepts covered in this guide, it's recommended that you review the DAG Writing Best Practices in Apache Airflow webinar and the Github repo for DAG examples.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Airflow operators. See Operators 101.
Review idempotency
Idempotency is the foundation for many computing practices, including the Airflow best practices in this guide. A program is considered idempotent if, for a set input, running the program once has the same effect as running the program multiple times.
In the context of Airflow, a DAG is considered idempotent if rerunning the same DAG Run with the same inputs multiple times has the same effect as running it only once. This can be achieved by designing each individual task in your DAG to be idempotent. Designing idempotent DAGs and tasks decreases recovery time from failures and prevents data loss.
Idempotency paves the way for one of Airflow's most useful features: Retries.
Set retries
In a distributed environment where task containers are executed on shared hosts, it's possible for tasks to be killed off unexpectedly. When this happens, you might see a zombie process in the Airflow logs.
You can resolve issues like zombies by using task retries. Retries can be set at different levels with the following precedence:
- Tasks: Pass the
retries
parameter to the task's Operator. - DAGs: Include
retries
in a DAG'sdefault_args
object. - Deployments: Set the environment variable
AIRFLOW__CORE__DEFAULT_TASK_RETRIES
.
Setting retries to 2
will protect a task from most problems common to distributed environments. For more on using retries, see Rerun DAGs and Tasks.
DAG design
The following DAG design principles will help to make your DAGs idempotent, efficient, and readable.
Keep tasks atomic
When organizing your pipeline into individual tasks, each task should be responsible for one operation that can be re-run independently of the others. In an atomized task, a success in part of the task means a success of the entire task.
For example, in an ETL pipeline you would ideally want your Extract, Transform, and Load operations covered by three separate tasks. Atomizing these tasks allows you to rerun each operation in the pipeline independently, which supports idempotence.
Use template fields, variables, and macros
By using templated fields in Airflow, you can pull values into DAGs using environment variables and jinja templating. Compared to using Python functions, using templated fields helps keep your DAGs idempotent and ensures you aren't executing functions on every Scheduler heartbeat. See Avoid top level code in your DAG file.
Contrary to our best practices, the following example defines variables based on datetime
Python functions:
# Variables used by tasks
# Bad example - Define today's and yesterday's date using datetime module
today = datetime.today()
yesterday = datetime.today() - timedelta(1)
If this code is in a DAG file, these functions are executed on every Scheduler heartbeat, which may not be performant. Even more importantly, this doesn't produce an idempotent DAG. You can't rerun a previously failed DAG run for a past date because datetime.today()
is relative to the current date, not the DAG execution date.
A better way of implementing this is by using an Airflow variable:
# Variables used by tasks
# Good example - Define yesterday's date with an Airflow variable
yesterday = {{ yesterday_ds_nodash }}
You can use one of the Airflow built-in variables and macros, or you can create your own templated field to pass information at runtime. For more information on this topic, see templating and macros in Airflow.
Incremental record filtering
You should break out your pipelines into incremental extracts and loads wherever possible. For example, if you have a DAG that runs hourly, each DAG run should process only records from that hour, rather than the whole dataset. When the results in each DAG run represent only a small subset of your total dataset, a failure in one subset of the data won't prevent the rest of your DAG Runs from completing successfully. If your DAGs are idempotent, you can rerun a DAG for only the data that failed rather than reprocessing the entire dataset.
There are multiple ways you can achieve incremental pipelines.
Last modified date
Using a last modified date is recommended for incremental loads. Ideally, each record in your source system has a column containing the last time the record was modified. With this design, a DAG run looks for records that were updated within specific dates from this column.
For example, with a DAG that runs hourly, each DAG run is responsible for loading any records that fall between the start and end of its hour. If any of those runs fail, it doesn't affect other Runs.
Sequence IDs
When a last modified date is unavailable, a sequence or incrementing ID can be used for incremental loads. This logic works best when the source records are only being appended to and not updated. Although implementing a last modified date system in your records is considered best practice, basing your incremental logic off of a sequence ID can be a sound way to filter pipeline records without a last modified date.
Avoid top-level code in your DAG file
In the context of Airflow, top-level code refers to any code that is run at the time the DAG is parsed, as opposed to the time the task is run.
Code that is part of an operator or a decorated task is run by Airflow only when the task runs, not when the DAG is parsed. For example, in the following code, call_external_systems()
is considered top-level code because it runs when the DAG is parsed. x + y
is not top-level code, because it is part of the task definition and only runs when the task runs.
@dag(...)
def the_dag():
@task
def do_thing():
x + y
num_of_things = call_external_system() # this is "top level code"
chain(do_thing() for _ in range(num_of_things))
the_dag()
Generally, any code that isn't part of your DAG or operator instantiations and that makes requests to external systems is of concern. Airflow executes all code in the dags_folder
on every min_file_process_interval
, which defaults to 30 seconds. Therefore, any code that is run when the DAG is parsed and makes requests to external systems, like an API or a database, or makes function calls outside of your tasks can cause performance issues since these requests and connections are being made every 30 seconds rather than only when the DAG is scheduled to run.
To see another example, the following DAG example dynamically generates tasks using the PostgresOperator based on records pulled from a different database.
In the Bad practice example the connection to the other database is made outside of an operator instantiation as top-level code. When the scheduler parses this DAG, it will use the hook
and result
variables to query the grocery_list
table. This query is run every time the DAG is parsed, which can cause performance issues.
The version shown under the Good practice DAG wraps the connection to the database into its own task, the get_list_of_results
task. Now the connection is only made at when the DAG actually runs, preventing performance issues.
- Bad practice
- Good practice
"""WARNING: This DAG is used as an example for _bad_ Airflow practices. Do not
use this DAG."""
from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pendulum import datetime
# Bad practice: top-level code in a DAG file
hook = PostgresHook("database_conn")
results = hook.get_records("SELECT * FROM grocery_list;")
sql_queries = []
for result in results:
grocery = result[0]
amount = result[1]
sql_query = f"INSERT INTO purchase_order VALUES ('{grocery}', {amount});"
sql_queries.append(sql_query)
@dag(
start_date=datetime(2023, 1, 1), max_active_runs=3, schedule="@daily", catchup=False
)
def bad_practices_dag_1():
insert_into_purchase_order_postgres = PostgresOperator.partial(
task_id="insert_into_purchase_order_postgres",
postgres_conn_id="postgres_default",
).expand(sql=sql_queries)
bad_practices_dag_1()
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pendulum import datetime
@dag(
start_date=datetime(2023, 1, 1), max_active_runs=3, schedule="@daily", catchup=False
)
def good_practices_dag_1():
@task
def get_list_of_results():
# good practice: wrap database connections into a task
hook = PostgresHook("database_conn")
results = hook.get_records("SELECT * FROM grocery_list;")
return results
@task
def create_sql_query(result):
grocery = result[0]
amount = result[1]
sql = f"INSERT INTO purchase_order VALUES ('{grocery}', {amount});"
return sql
sql_queries = create_sql_query.expand(result=get_list_of_results())
insert_into_purchase_order_postgres = PostgresOperator.partial(
task_id="insert_into_purchase_order_postgres",
postgres_conn_id="postgres_default",
).expand(sql=sql_queries)
good_practices_dag_1()
Treat your DAG file like a config file
Including code that isn't part of your DAG or operator instantiations in your DAG file makes the DAG harder to read, maintain, and update. When possible, leave all of the heavy lifting to the hooks and operators that you instantiate within the file. If your DAGs need to access additional code such as a SQL script or a Python function, consider keeping that code in a separate file that can be read into a DAG run.
The following example DAGs demonstrate the difference between the bad and good practices of including code in your DAGs. In the Bad practice DAG, a SQL query is provided directly to the PostgresOperator, which unnecessarily exposes code in your DAG. In the Good practice DAG, the DAG-level configuration includes template_searchpath
and the PostgresOperator specifies a covid_state_query.sql
file that contains the query to execute.
- Bad practice
- Good practice
"""WARNING: This DAG is used as an example for _bad_ Airflow practices. Do not
use this DAG."""
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from pendulum import datetime, duration
# Default settings applied to all tasks
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=1),
}
today = "2023-01-02"
yesterday = "2023-01-01"
# Instantiate DAG
@dag(
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule="@daily",
default_args=default_args,
catchup=False,
)
def bad_practices_dag_2():
t0 = EmptyOperator(task_id="start")
# Bad practice: long SQL query directly in the DAG file
query_1 = PostgresOperator(
task_id="covid_query_wa",
postgres_conn_id="postgres_default",
sql="""WITH yesterday_covid_data AS (
SELECT *
FROM covid_state_data
WHERE date = {{ params.today }}
AND state = 'WA'
),
today_covid_data AS (
SELECT *
FROM covid_state_data
WHERE date = {{ params.yesterday }}
AND state = 'WA'
),
two_day_rolling_avg AS (
SELECT AVG(a.state, b.state) AS two_day_avg
FROM yesterday_covid_data AS a
JOIN yesterday_covid_data AS b
ON a.state = b.state
)
SELECT a.state, b.state, c.two_day_avg
FROM yesterday_covid_data AS a
JOIN today_covid_data AS b
ON a.state=b.state
JOIN two_day_rolling_avg AS c
ON a.state=b.two_day_avg;""",
params={"today": today, "yesterday": yesterday},
)
t0 >> query_1
bad_practices_dag_2()
from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
from pendulum import datetime, duration
# Default settings applied to all tasks
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=1),
}
state = "Ohio"
# Instantiate DAG
@dag(
start_date=datetime(2023, 1, 1),
max_active_runs=3,
schedule="@daily",
default_args=default_args,
catchup=False,
# include path to look for external files
template_searchpath="/usr/local/airflow/include",
)
def good_practices_dag_1():
query = PostgresOperator(
task_id="covid_query_{0}".format(state),
postgres_conn_id="postgres_default",
# reference query kept in separate file
sql="covid_state_query.sql",
params={"state": f"'{state}'"},
)
query
good_practices_dag_1()
Use a consistent method for task dependencies
In Airflow, task dependencies can be set multiple ways. You can use set_upstream()
and set_downstream()
functions, or you can use <<
and >>
operators. Which method you use is a matter of personal preference, but for readability it's best practice to choose one method and use it consistently.
For example, instead of mixing methods like this:
task_1.set_downstream(task_2)
task_3.set_upstream(task_2)
task_3 >> task_4
Try to be consistent with something like this:
task_1 >> task_2 >> [task_3, task_4]
Leverage Airflow features
To get the most out of Airflow, leverage built-in features and the broader Airflow ecosystem, namely provider packages for third-party integrations, to fulfill specific use cases. Using Airflow in this way makes it easier to scale and pull in the right tools based on your needs.
Make use of provider packages
One of the best aspects of Airflow is its robust and active community, which has resulted in integrations between Airflow and other tools known as provider packages.
Provider packages let you orchestrate third party data processing jobs directly from Airflow. Wherever possible, it's recommended that you make use of these integrations rather than writing Python functions yourself. This makes it easier for organizations using existing tools to adopt Airflow, and you don't have to write new code.
For more information about the available provider packages, see the Astronomer Registry.
Decide where to run data processing jobs
There are many options available for implementing data processing. For small to medium scale workloads, it is typically safe to do your data processing within Airflow as long as you allocate enough resources to your Airflow infrastructure. Large data processing jobs are typically best offloaded to a framework specifically optimized for those use cases, such as Apache Spark. You can then use Airflow to orchestrate those jobs.
Astronomer recommends that you consider the size of your data now and in the future when deciding whether to process data within Airflow or offload to an external tool. Follow these recommendations if your use case is well suited to processing data within Airflow:
- Ensure your Airflow infrastructure has the necessary resources.
- Use the Kubernetes Executor to isolate task processing and have more control over resources at the task level.
- Use a custom XCom backend if you need to pass any data between the tasks so you don't overload your metadata database.
Use intermediary data storage
Because it requires less code and fewer pieces, it can be tempting to write your DAGs to move data directly from your source to destination. However, this means you can't individually rerun the extract or load portions of the pipeline. By putting an intermediary storage layer such as Amazon S3 or SQL Staging tables in between your source and destination, you can separate the testing and rerunning of the extract and load.
Depending on your data retention policy, you could modify the load logic and rerun the entire historical pipeline without having to rerun the extracts. This is also useful in situations where you no longer have access to the source system such as hitting an API limit.
Use the right tool for the job
Airflow excels at course-grain parallellism. If you need to do large scale transformations or fine-grain parallelism, consider using Airflow to trigger other data processing frameworks.
Other best practices
Here are a few other noteworthy best practices that you should follow.
Use a consistent file structure
Having a consistent file structure for Airflow projects keeps things organized and easy to adopt. This is the structure that Astronomer uses:
├── dags/ # Where your DAGs go
│ └── example-dag.py # An example dag that comes with the initialized project
├── Dockerfile # For Astronomer's Docker image and runtime overrides
├── include/ # For any other files you'd like to include
├── plugins/ # For any custom or community Airflow plugins
├── packages.txt # For OS-level packages
└── requirements.txt # For any Python packages
Use DAG name and start date properly
You should always use a static start_date
with your DAGs. A dynamic start_date
is misleading, and can cause failures when clearing out failed task instances and missing DAG runs.
Additionally, if you change the start_date
of your DAG you should also change the DAG name. Changing the start_date
of a DAG creates a new entry in the Airflow database. This can confuse the Scheduler because there are two DAGs with the same name but different schedules.
Changing the name of a DAG also creates a new entry in the database that powers the dashboard. Follow a consistent naming convention since changing a DAG's name doesn't delete the entry in the database for the old name.