Test Airflow DAGs
Effectively testing DAGs requires an understanding of their structure and their relationship to other code and data in your environment. In this guide, you'll learn about various types of DAG validation testing, unit testing, and where to find further information on data quality checks.
There are multiple resources for learning about this topic. See also:
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Python testing basics. See Getting Started with Testing in Python.
- At least one Python test runner. This guide mostly uses
pytest
, but you can use others includingnose2
andunittest
. - CI/CD for Python scripts. See Continuous Integration with Python: An Introduction.
- Basic Airflow and Astro CLI concepts. See Get started with Airflow.
Write DAG validation tests
DAG validation tests ensure that your DAGs fulfill a list of criteria. Using validation tests can help you:
- Develop DAGs without access to a local Airflow environment.
- Ensure that custom DAG requirements are systematically checked and fulfilled.
- Test DAGs automatically in a CI/CD pipeline.
- Enable power users to test DAGs from the CLI.
At a minimum, you should run DAG validation tests to check for import errors. Additional tests can check things like custom logic, ensuring that catchup
is set to False for every DAG in your Airflow instance, or making sure only tags
from a defined list are used in the DAGs.
DAG validation tests apply to all DAGs in your Airflow environment, so you only need to create one test suite.
Common DAG validation tests
This section covers the most common types of DAG validation tests with full code examples.
Check for import errors
The most common DAG validation test is to check for import errors. Checking for import errors through a validation test is faster than starting your Airflow environment and checking for errors in the Airflow UI. In the following test, get_import_errors
checks the .import_errors
attribute of the current DagBag
.
import os
import pytest
from airflow.models import DagBag
def get_import_errors():
"""
Generate a tuple for import errors in the dag bag
"""
dag_bag = DagBag(include_examples=False)
def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))
# prepend "(None,None)" to ensure that a test object is always created even if it's a no op.
return [(None, None)] + [
(strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items()
]
@pytest.mark.parametrize(
"rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]
)
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if rel_path and rv:
raise Exception(f"{rel_path} failed to import with message \n {rv}")
Check for custom code requirements
Airflow DAGs support many types of custom plugins and code. It is common for data engineering teams to define best practices and custom rules around how their DAGs should be written and create DAG validation tests to ensure those standards are met.
The code snippet below includes a test which checks that all DAGs have their tags
parameter set to one or more of the APPROVED_TAGS
.
import os
import pytest
from airflow.models import DagBag
def get_dags():
"""
Generate a tuple of dag_id, <DAG objects> in the DagBag
"""
dag_bag = DagBag(include_examples=False)
def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))
return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()]
APPROVED_TAGS = {"customer_success", "op_analytics", "product"}
@pytest.mark.parametrize(
"dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()]
)
def test_dag_tags(dag_id, dag, fileloc):
"""
test if a DAG is tagged and if those TAGs are in the approved list
"""
assert dag.tags, f"{dag_id} in {fileloc} has no tags"
if APPROVED_TAGS:
assert not set(dag.tags) - APPROVED_TAGS
You can view the attributes and methods available for the dag
model in the Airflow documentation.
You can also set requirements at the task level by accessing the tasks
attribute within the dag
model, which contains a list of all task objects of a DAG. The test below checks that all DAGs contain at least one task and all tasks use trigger_rule="all_success"
.
@pytest.mark.parametrize(
"dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()]
)
def test_dag_tags(dag_id, dag, fileloc):
"""
test if all DAGs contain a task and all tasks use the trigger_rule all_success
"""
assert dag.tasks, f"{dag_id} in {fileloc} has no tasks"
for task in dag.tasks:
t_rule = task.trigger_rule
assert (
t_rule == "all_success"
), f"{task} in {dag_id} has the trigger rule {t_rule}"
Implement DAG validation tests
Airflow offers different ways to run DAG validation tests using any Python test runner. This section gives an overview of the most common implementation methods. If you are new to testing Airflow DAGs, you can quickly get started by using Astro CLI commands.
Airflow CLI
The Airflow CLI offers two commands related to local testing:
airflow dags test
: Given a DAG ID and execution date, this command writes the results of a single DAG run to the metadata database. This command is useful for testing DAGs by creating manual DAG runs from the command line. In Airflow 2.10+ it is possible to skip tasks based on their task id with--mark-success-pattern
flag and run them with their configured executor(s) with--use-executor
.airflow tasks test
: This command tests one specific task instance without checking for dependencies or recording the outcome in the metadata database.
With the Astro CLI, you can run all Airflow CLI commands using astro dev run
. For example, to run airflow dags test
on the DAG my_dag
for the execution date of 2023-01-29
run:
astro dev run dags test my_dag '2023-01-29'
The Astro CLI
The Astro CLI includes a suite of commands to help simplify common testing workflows. See Test your Astro project locally.
Test DAGs in a CI/CD pipeline
You can use CI/CD tools to test and deploy your Airflow code. By installing the Astro CLI into your CI/CD process, you can test your DAGs before deploying them to a production environment. See set up CI/CD for example implementations.
Astronomer customers can use the Astro GitHub integration, which allows you to automatically deploy code from a GitHUb repository to an Astro deployment, viewing Git metadata in the Astro UI. See Deploy code with the Astro GitHub integration.
Add test data or files for local testing
Use the include
folder of your Astro project to store files for testing locally, such as test data or a dbt project file. The files in your include
folder are included in your deploys to Astro, but they are not parsed by Airflow. Therefore, you don't need to specify them in .airflowignore
to prevent parsing.
If you're running Airflow locally, apply your changes by refreshing the Airflow UI.
Debug interactively with dag.test()
The dag.test()
method allows you to run all tasks in a DAG within a single serialized Python process, without running the Airflow scheduler. The dag.test()
method lets you iterate faster and use IDE debugging tools when developing DAGs.
This functionality replaces the deprecated DebugExecutor. Learn more in the Airflow documentation.
Prerequisites
Ensure that your testing environment has:
- Airflow 2.5.0 or later. You can check your version by running
airflow version
. - All provider packages that your DAG uses.
- An initialized Airflow metadata database, if your DAG uses elements of the metadata database like XCom. The Airflow metadata database is created when Airflow is first run in an environment. You can check that it exists with
airflow db check
and initialize a new database withairflow db migrate
(airflow db init
in Airflow versions pre-2.7).
You may wish to install these requirements and test your DAGs in a virtualenv to avoid dependency conflicts in your local environment.
Setup
To use dag.test()
, you only need to add a few lines of code to the end of your DAG file. If you are using a traditional DAG context, call dag.test()
after your DAG declaration. If you are using the @dag
decorator, assign your DAG function to a new object and call the method on that object.
- Traditional DAG context
- @dag decorator
from airflow.models.dag import DAG
from pendulum import datetime
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="simple_classic_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False,
) as dag: # assigning the context to an object is mandatory for using dag.test()
t1 = EmptyOperator(task_id="t1")
if __name__ == "__main__":
dag.test()
from airflow.decorators import dag
from pendulum import datetime
from airflow.operators.empty import EmptyOperator
@dag(
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False,
)
def my_dag():
t1 = EmptyOperator(task_id="t1")
dag_object = my_dag()
if __name__ == "__main__":
dag_object.test()
You can run the .test()
method with popular debugging tools such as:
- VSCode.
- PyCharm.
- Tools like The Python Debugger and the built-in
breakpoint()
function. These allow you to rundag.test()
from the command line by runningpython <path-to-dag-file>
.
Use dag.test()
with the Astro CLI
If you use the Astro CLI exclusively and do not have the airflow
package installed locally, you can still debug using dag.test()
by running astro dev start
, entering the scheduler container with astro dev bash -s
, and executing python <path-to-dag-file>
from within the Docker container. Unlike using the base airflow
package, this testing method requires starting up a complete Airflow environment.
Use variables and connections in dag.test()
To debug your DAGs in a more realistic environment, you can pass the following Airflow environment configurations to dag.test()
:
execution_date
passed as apendulum.datetime
object.- Airflow connections passed as a
.yaml
file. - Airflow variables passed as a
.yaml
file. - DAG configuration passed as a dictionary.
This is useful for testing your DAG for different dates or with different connections and configurations. The following code snippet shows the syntax for passing various parameters to dag.test()
:
from pendulum import datetime
if __name__ == "__main__":
conn_path = "connections.yaml"
variables_path = "variables.yaml"
my_conf_var = 23
dag.test(
execution_date=datetime(2023, 1, 29),
conn_file_path=conn_path,
variable_file_path=variables_path,
run_conf={"my_conf_var": my_conf_var},
)
The connections.yaml
file should list connections with their properties as shown in the following example:
my_aws_conn:
conn_type: amazon
login: <your-AWS-key>
password: <your-AWS-secret>
conn_id: my_aws_conn
Variables in a variables.yaml
need to be listed with their key
and value
:
my_variable:
key: my_variable
value: 42
By default, dag.test() runs tasks without an executor. In Airflow 2.10+ it is possible to run the tasks with their configured executors by setting use_executor
to True
in dag.test().
Skip tasks when using dag.test()
Airflow 2.10 added the possibility to skip the execution of tasks whose ids are matching a provided regex pattern when using dag.test(). This is particularly useful when you have sensors in a DAG that you'd like to bypass when testing.
if __name__ == "__main__":
dag.test(
# new in Airflow 2.10
mark_success_pattern="sensor.*", # regex of task ids to be auto-marked as successful
)
Unit testing
Unit testing is a software testing method where small chunks of source code are tested individually to ensure they function correctly. The objective is to isolate testable logic inside of small, well-named functions. For example:
def test_function_returns_5():
assert my_function(input) == 5
In the context of Airflow, you can write unit tests for any part of your DAG, but they are most frequently applied to hooks and operators. All Airflow hooks, operators, and provider packages must pass unit testing before code can be merged into the project. For an example of unit testing, see AWS S3Hook
and the associated unit tests.
If you are using custom hooks or operators, Astronomer recommends using unit tests to check the logic and functionality. In the following example, a custom operator checks if a number is even:
from airflow.models import BaseOperator
class EvenNumberCheckOperator(BaseOperator):
def __init__(self, my_operator_param, *args, **kwargs):
self.operator_param = my_operator_param
super().__init__(*args, **kwargs)
def execute(self, context):
if self.operator_param % 2:
return True
else:
return False
You then write a test_evencheckoperator.py
file with unit tests similar to the following example:
import unittest
from datetime import datetime
from airflow.models.dag import DAG
from airflow.models import TaskInstance
DEFAULT_DATE = datetime(2021, 1, 1)
class EvenNumberCheckOperator(unittest.TestCase):
def setUp(self):
super().setUp()
self.dag = DAG(
"test_dag", default_args={"owner": "airflow", "start_date": DEFAULT_DATE}
)
self.even = 10
self.odd = 11
def test_even(self):
"""Tests that the EvenNumberCheckOperator returns True for 10."""
task = EvenNumberCheckOperator(
my_operator_param=self.even, task_id="even", dag=self.dag
)
ti = TaskInstance(task=task, execution_date=datetime.now())
result = task.execute(ti.get_template_context())
assert result is True
def test_odd(self):
"""Tests that the EvenNumberCheckOperator returns False for 11."""
task = EvenNumberCheckOperator(
my_operator_param=self.odd, task_id="odd", dag=self.dag
)
ti = TaskInstance(task=task, execution_date=datetime.now())
result = task.execute(ti.get_template_context())
assert result is False
If your DAGs contain PythonOperators
that execute your own Python functions, it is recommended that you write unit tests for those functions as well.
The most common way to implement unit tests in production is to automate them as part of your CI/CD process. Your CI tool executes the tests and stops the deployment process when errors occur.
Mocking
Mocking is the imitation of an external system, dataset, or other object. For example, you might use mocking with an Airflow unit test if you are testing a connection, but don't have access to the metadata database. Mocking could also be used when you need to test an operator that executes an external service through an API endpoint, but you don't want to wait for that service to run a simple test.
Many Airflow tests use mocking. The blog Testing and debugging Apache Airflow discusses Airflow mocking and it might help you get started.
Data quality checks
Testing your DAG ensures that your code fulfills your requirements. But even if your code is perfect, data quality issues can break or negatively affect your pipelines. Airflow, being at the center of the modern data engineering stack, is the ideal tool for checking data quality.
Data quality checks differ from code-related testing because the data is not static like your DAG code. It is best practice to incorporate data quality checks into your DAGs and use Airflow dependencies and branching to handle what should happen in the event of a data quality issue, from halting the pipeline to sending notifications to data quality stakeholders.
There are many ways you can integrate data quality checks into your DAG:
- SQL check operators: Airflow-native operators that run highly customizable data quality checks on a wide variety of relational databases.
- Great Expectations: A data quality testing suite with an Airflow provider offering the ability to define data quality checks in JSON to run on relational databases, Spark and pandas DataFrames.
- Soda Core: A framework to check data quality using YAML configuration to define data quality checks to run on relational databases and Spark dataframes.
Data quality checks work better at scale if you design your DAGs to load or process data incrementally. To learn more about incremental loading, see DAG Writing Best Practices in Apache Airflow. Processing smaller, incremental chunks of data in each DAG Run ensures that any data quality issues have a limited effect.
Learn more about how to approach data quality within Airflow: