Datasets and data-aware scheduling in Airflow
With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.
Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and a machine learning team with a DAG that trains a model on the dataset. Using datasets, the machine learning team's DAG runs only when the data engineering team's DAG has produced an update to the dataset.
In this guide, you'll learn about datasets in Airflow and how to use them to implement triggering of DAGs based on dataset updates.
Datasets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.
There are multiple resources for learning about this topic. See also:
- Astronomer Academy: Airflow: Datasets module.
- Webinar: Data Driven Scheduling.
- Use case: Orchestrate machine learning pipelines with Airflow datasets.
Assumed knowledge
To get the most out of this guide, you should have an existing knowledge of:
- Airflow scheduling concepts. See Schedule DAGs in Airflow.
Why use Airflow datasets?
Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:
- Standardize communication between teams. Datasets can function like an API to communicate when data in a specific location has been updated and is ready for use.
- Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don't depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates a dataset.
- Get better visibility into how your DAGs are connected and how they depend on data. The Datasets tab in the Airflow UI shows a graph of all dependencies between DAGs and datasets in your Airflow environment.
- Reduce costs, because datasets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
- Create cross-deployment dependencies using the Airflow REST API. Astro customers can use the Cross-deployment dependencies best practices documentation for guidance.
- (Airflow 2.9+) Create complex data-driven schedules using Conditional Dataset Scheduling and Combined Dataset and Time-based Scheduling.
Dataset concepts
You can define datasets in your DAG code and use them to create cross-DAG or even cross-Deployment dependencies. This section covers definitions for dataset terminology, as well as general information on how to use them.
Dataset terminology
You can define datasets in your DAG code and use them to create cross-DAG dependencies. Airflow uses the following terms related to the datasets feature:
- Dataset: an object that is defined by a unique URI. Airflow parses the URI for validity and there are some constraints on how you can define it. If you want to avoid validity parsing, prefix your dataset name with
x-
for Airflow to treat it as a string. See What is a valid URI? for detailed information. - Dataset event: an event that is attached to a dataset and created whenever a producer task updates that particular dataset. A dataset event is defined by being attached to a specific dataset plus the timestamp of when a producer task updated the dataset. Optionally, a dataset event can contain an
extra
dictionary with additional information about the dataset or dataset event. - Dataset schedule: the schedule of a DAG that is triggered as soon as dataset events for one or more datasets are created. All datasets a DAG is scheduled on are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Datasets tab.
- Producer task: a task that produces updates to one or more datasets provided to its
outlets
parameter, creating dataset events when it completes successfully. - Dataset expression: (Airflow 2.9+) a logical expression using AND (
&
) and OR (|
) operators to define the schedule of a DAG scheduled on updates to several datasets. - Queued dataset event: It is common to have DAGs scheduled to run as soon as a set of datasets have received at least one update each. While there are still dataset events missing to trigger the DAG, all dataset events for other datasets the DAG is scheduled on are queued dataset events. A queued dataset event is defined by its dataset, timestamp and the DAG it is queuing for. One dataset event can create a queued dataset event for several DAGs. As of Airflow 2.9, you can access queued Dataset events for a specific DAG or a specific dataset programmatically, using the Airflow REST API.
- DatasetAlias (Airflow 2.10+): an object that can be associated to one or more datasets and used to create schedules based on datasets created at runtime, see Use dataset aliases. A dataset alias is defined by a unique name.
- Metadata (Airflow 2.10+): a class to attach
extra
information to a dataset from within the producer task. This functionality can be used to pass dataset-related metadata between tasks, see Attaching information to a dataset event.
Two parameters relating to Airflow datasets exist in all Airflow operators and decorators:
- Outlets: a task parameter that contains the list of datasets a specific tasks produces updates to, as soon as it completes successfully. All outlets of a task are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Datasets tab as soon as the DAG code is parsed, i.e. independently of whether or not any dataset events have occurred. Note that Airflow is not yet aware of the underlying data. It is up to you to determine which tasks should be considered producer tasks for a dataset. As long as a task has an outlet dataset, Airflow considers it a producer task even if that task doesn't operate on the referenced dataset.
- Inlets: a task parameter that contains the list of datasets a specific task has access to, typically to access
extra
information from related dataset events. Defining inlets for a task does not affect the schedule of the DAG containing the task and the relationship is not reflected in the Airflow UI.
To summarize, tasks produce updates to datasets given to their outlets
parameter, and this action creates dataset events. DAGs can be scheduled based on dataset events created for one or more datasets, and tasks can be given access to all events attached to a dataset by defining the dataset as one of their inlets
. A dataset is defined as an object in the Airflow metadata database as soon as it is referenced in either, the outlets
parameter of a task or the schedule
of a DAG.
Using datasets
When you work with datasets, keep the following considerations in mind:
- Datasets events are only registered by DAGs or listeners in the same Airflow environment. If you want to create cross-Deployment dependencies with Datasets you will need to use the Airflow REST API to create a dataset event in the Airflow environment where your downstream DAG is located. See the Cross-deployment dependencies for an example implementation on Astro.
- Airflow monitors datasets only within the context of DAGs and tasks. It does not monitor updates to datasets that occur outside of Airflow. I.e. Airflow will not notice if you manually add a file to an S3 bucket referenced by a dataset. To create Airflow dependencies based on outside events, use Airflow sensors.
- The Datasets tab in the Airflow UI provides an overview over recent dataset events, existing datasets as well as a graph showing all dependencies between DAGs containing producing tasks, datasets and consuming DAGs. See Datasets tab for more information.
As of Airflow 2.8, you can use listeners to enable Airflow to run any code when certain dataset events occur anywhere in your Airflow instance. There are two listener hooks for the following events:
- on_dataset_created
- on_dataset_changed
For examples, refer to our Create Airflow listeners tutorial. Dataset Events listeners are an experimental feature.
Dataset definition
A dataset is defined as an object in the Airflow metadata database as soon as it is referenced in either the outlets
parameter of a task or the schedule
of a DAG. Airflow 2.10 added the ability to create dataset aliases, see Use Dataset Aliases.
Basic Dataset definition
The simplest dataset schedule is one DAG scheduled based on updates to one dataset which is produced to by one task. In this example we define that the my_producer_task
task in the my_producer_dag
DAG produces updates to the s3://my-bucket/my-key/
dataset, creating attached dataset events, and schedule the my_consumer_dag
DAG to run once for every dataset event created.
First, provide the dataset to the outlets parameter of the producer task.
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag, task
from airflow.datasets import Dataset
@dag(
start_date=None,
schedule=None,
catchup=False,
)
def my_producer_dag():
@task(outlets=[Dataset("s3://my-bucket/my-key/")])
def my_producer_task():
pass
my_producer_task()
my_producer_dag()
from airflow.models.dag import DAG
from airflow.datasets import Dataset
from airflow.operators.python import PythonOperator
with DAG(
dag_id="my_producer_dag",
start_date=None,
schedule=None,
catchup=False,
):
def my_function():
pass
my_task = PythonOperator(
task_id="my_producer_task",
python_callable=my_function,
outlets=[Dataset("s3://my-bucket/my-key/")]
)
You can see the relationship between the DAG containing the producing task (my_producer_dag
) and the dataset in the Dependency Graph located in the Datasets tab of the Airflow UI. Note that this screenshot is using Airflow 2.10 and the UI might look different in previous versions.
In Airflow 2.9+ the graph view of the my_producer_dag
shows the dataset as well.
Next, schedule the my_consumer_dag
to run as soon as a new dataset event is produced to the s3://my-bucket/my-key/
dataset.
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator
from pendulum import datetime
@dag(
start_date=datetime(2024, 8, 1),
schedule=[Dataset("s3://my-bucket/my-key/")],
catchup=False,
)
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
from airflow.models.dag import DAG
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator
from pendulum import datetime
with DAG(
dag_id="my_consumer_dag",
start_date=datetime(2024, 8, 1),
schedule=[Dataset("s3://my-bucket/my-key/")],
catchup=False,
):
EmptyOperator(task_id="empty_task")
You can see the relationship between the DAG containing the producing task (my_producer_dag
), the consuming DAG my_consumer_dag
and the dataset in the Dependency Graph located in the Datasets tab of the Airflow UI. Note that this screenshot is using Airflow 2.10 and the UI might look different in previous versions.
In Airflow 2.9+ the graph view of the my_consumer_dag
shows the dataset as well.
After unpausing the my_consumer_dag
, every successful completion of the my_producer_task
task triggers a run of the my_consumer_dag
.
In Airflow 2.10+ the producing task will list the Dataset Events it caused in its details page, including links to the Triggered Dag Runs.
The triggered DAG run of the my_consumer_dag
also lists the dataset event, including a link to the source dag from within which the dataset event was created.
Use dataset aliases
In Airflow 2.10+ you have the option to create dataset aliases to schedule DAGs based on datasets with URIs generated at runtime. A dataset alias is defined by a unique name
string and can be used in place of a regular dataset in outlets
and schedules
.
Any number of dataset events updating different datasets can be attached to a dataset alias.
There are two ways to add a dataset event to a dataset alias:
- Using the
Metadata
class. - Using
outlet_events
pulled from the Airflow context.
See the code below for examples, note how the URI of the dataset is determined at runtime inside the producing task.
- Metadata
- Outlet events
# from airflow.decorators import task
# from airflow.datasets import Dataset, DatasetAlias
# from airflow.datasets.metadata import Metadata
my_alias_name = "my_alias"
@task(outlets=[DatasetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
bucket_name = "my-bucket" # determined at runtime, for example based on upstream input
yield Metadata(
Dataset(f"s3://{bucket_name}/my-task"),
extra={"k": "v"}, # extra has to be provided, can be {}
alias=my_alias_name,
)
attach_event_to_alias_metadata()
# from airflow.decorators import task
# from airflow.datasets import Dataset, DatasetAlias
# from airflow.datasets.metadata import Metadata
my_alias_name = "my_alias"
@task(outlets=[DatasetAlias(my_alias_name)])
def attach_event_to_alias_context(**context):
bucket_name = "my-other-bucket" # determined at runtime, for example based on upstream input
outlet_events = context["outlet_events"]
outlet_events[my_alias_name].add(
Dataset(f"s3://{bucket_name}/my-task"), extra={"k": "v"}
) # extra is optional
attach_event_to_alias_context()
In the consuming DAG you can use a dataset alias in place of a regular dataset.
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.datasets import Dataset
from pendulum import datetime
my_alias_name = "my_alias"
@dag(
start_date=datetime(2024, 8, 1),
schedule=[DatasetAlias(my_alias_name)],
catchup=False,
)
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
Since the dataset event is generated at runtime with a dynamic URI, Airflow does not know in advance which dataset will trigger the run of the my_consumer_dag
. The Airflow UI displays Unresolved DatasetAlias
as the DAG schedule for DAGs that are only scheduled on aliases that have never had a dataset event attached to them.
Once the my_producer_dag
containing the attach_event_to_alias_metadata
task completes successfully, reparsing of all DAGs scheduled on the dataset alias my_alias
is automatically triggered. This reparsing step attaches the s3://my-bucket/my-task
dataset to the my_alias
dataset alias and the schedule resolves, triggering one run of the my_consumer_dag
.
Any further dataset event for the s3://my-bucket/my-task
dataset will now trigger the my_consumer_dag
. If you attach dataset events for several datasets to the same dataset alias, a DAG scheduled on that dataset alias will run as soon as any of the datasets that were ever attached to the dataset alias receive an update.
See Dynamic data events emitting and dataset creation through DatasetAlias for more information and examples of using dataset aliases.
To use Dataset Aliases with traditional operators, you need to attach the dataset event to the alias inside the operator logic. If you are using operators besides the PythonOperator, you can either do so in a custom operator's .execute
method or by passing a post_execute
callable to existing operators (experimental). Use outlet_events
when attaching dataset events to aliases in traditional or custom operators. Note that for deferrable operators, attaching a dataset event to an alias is only supported in the execute_complete
or post_execute
method.
def _attach_event_to_alias(context, result): # result = the return value of the execute method
# use any logic to determine the URI
uri = "s3://my-bucket/my_file.txt"
context["outlet_events"][my_alias_name].add(Dataset(uri))
BashOperator(
task_id="t2",
bash_command="echo hi",
outlets=[DatasetAlias(my_alias_name)],
post_execute=_attach_event_to_alias, # using the post_execute parameter is experimental
)
Click to view an example of a custom operator attaching a dataset event to a dataset alias.
"""
### Dataset Alias in a custom operator
"""
from airflow.decorators import dag
from airflow.datasets import Dataset, DatasetAlias
from pendulum import datetime
import logging
t_log = logging.getLogger("airflow.task")
my_alias_name = "my-alias"
# import the operator to inherit from
from airflow.models.baseoperator import BaseOperator
# custom operator producing to a dataset alias
class MyOperator(BaseOperator):
"""
Simple example operator that attaches a dataset event to a dataset alias.
:param my_bucket_name: (str) The name of the bucket to use in the dataset URI.
"""
# define the .__init__() method that runs when the DAG is parsed
def __init__(self, my_bucket_name, my_alias_name, *args, **kwargs):
# initialize the parent operator
super().__init__(*args, **kwargs)
# assign class variables
self.my_bucket_name = my_bucket_name
self.my_alias_name = my_alias_name
def execute(self, context):
# add your custom operator logic here
# use any logic to derive the dataset URI
my_uri = f"s3://{self.my_bucket_name}/my_file.txt"
context["outlet_events"][self.my_alias_name].add(Dataset(my_uri))
return "hi :)"
# define the .post_execute() method that runs after the execute method (optional)
# result is the return value of the execute method
def post_execute(self, context, result=None):
# write to Airflow task logs
self.log.info("Post-execution step")
# It is also possible to add events to the alias in the post_execute method
@dag(
start_date=datetime(2024, 8, 1),
schedule=None,
catchup=False,
doc_md=__doc__,
)
def dataset_alias_custom_operator():
MyOperator(
task_id="t1",
my_bucket_name="my-bucket",
my_alias_name=my_alias_name,
outlets=[DatasetAlias(my_alias_name)],
)
dataset_alias_custom_operator()
Updating a dataset
As of Airflow 2.9+ there are three ways to update a dataset:
-
A task with an outlet parameter that references the dataset completes successfully.
-
A
POST
request to the datasets endpoint of the Airflow REST API. -
A manual update in the Airflow UI.
Attaching information to a dataset event
When updating a dataset in the Airflow UI or making a POST
request to the Airflow REST API, you can attach extra information to the dataset event by providing an extra
json payload. Airflow 2.10 added the possibility to add extra information from within the producing task using either the Metadata
class or accessing outlet_events
from the Airflow context. You can attach any information to the extra that was computed within the task, for example information about the dataset you are working with.
To use the Metadata
class to attach information to a dataset, follow the example in the code snippet below. Make sure that the dataset used in the metadata class is also defined as an outlet in the producer task.
- TaskFlow API
- Traditional syntax
# from airflow.decorators import task
# from airflow.datasets import Dataset
# from airflow.datasets.metadata import Metadata
my_dataset_1 = Dataset("x-dataset1")
@task(outlets=[my_dataset_1])
def attach_extra_using_metadata():
num = 23
yield Metadata(my_dataset_1, {"myNum": num})
return "hello :)"
attach_extra_using_metadata()
# from airflow.operators.python import PythonOperator
# from airflow.datasets import Dataset
# from airflow.datasets.metadata import Metadata
my_dataset_1 = Dataset("x-dataset1")
def attach_extra_using_metadata_func():
num = 23
yield Metadata(my_dataset_1, {"myNum": num})
return "hello :)"
attach_extra_using_metadata = PythonOperator(
task_id="attach_extra_using_metadata",
python_callable=my_function,
outlets=[my_dataset_1]
)
You can also access the outlet_events
from the Airflow context directly to add an extra dictionary to a dataset event.
- TaskFlow API
- Traditional syntax
# from airflow.decorators import task
# from airflow.datasets import Dataset
# from airflow.datasets.metadata import Metadata
my_dataset_2 = Dataset("x-dataset2")
@task(outlets=[my_dataset_2])
def use_outlet_events(**context):
num = 19
context["outlet_events"][my_dataset_2].extra = {"my_num": num}
return "hello :)"
use_outlet_events()
# from airflow.operators.python import PythonOperator
# from airflow.datasets import Dataset
# from airflow.datasets.metadata import Metadata
my_dataset_2 = Dataset("x-dataset2")
def attach_extra_using_metadata_func():
num = 19
context["outlet_events"][my_dataset_2].extra = {"my_num": num}
return "hello :)"
attach_extra_using_metadata = PythonOperator(
task_id="attach_extra_using_metadata",
python_callable=my_function,
outlets=[my_dataset_2]
)
Dataset extras can be viewed in the Airflow UI in the Dataset Events list on the producing task, consuming DAG run, as well as in the Datasets tab.
Retrieving dataset information in a downstream task
Extras can be programmatically retrieved from within Airflow tasks. Any Airflow task instance in a DAG run has access to the list of datasets that were involved in triggering that specific DAG run (triggering_dataset_events
). Additionally, you can give any Airflow task access to all dataset events of a specific dataset by providing the dataset to the task's inlets
parameter. Defining inlets does not affect the schedule of the DAG.
To access the all dataset events that were involved in triggering a DAG run within a TaskFlow API task, simply pull it from the Airflow context. In a traditional operator, you can use Jinja templating in any templateable field of the operator to pull information from the Airflow context.
- TaskFlow API
- Traditional syntax
# from airflow.decorators import task
@task
def get_extra_triggering_run(**context):
# all events that triggered this specific DAG run
triggering_dataset_events = context["triggering_dataset_events"]
# the loop below wont run if the DAG is manually triggered
for dataset, dataset_list in triggering_dataset_events.items():
print(dataset, dataset_list)
print(dataset_list[0].extra)
# you can also fetch the run_id and other information about the upstream DAGs,
# note that this will error if the Dataset was updated via the API!
print(dataset_list[0].source_dag_run.run_id)
# from airflow.operators.bash import BashOperator
get_extra_triggering_run_bash = BashOperator(
task_id="get_extra_triggering_run_bash",
# This statement errors when there are no triggering events, for example in a manual run!
bash_command="echo {{ (triggering_dataset_events.values() | first | first).extra['myNum'] }} ",
# The below version returns an empty string if there are no triggering dataset events or the extra is not present
# bash_command="echo {{ (triggering_dataset_events.values() | default([]) | first | default({}) | first | default({})).extra.get('myNum', '') if (triggering_dataset_events.values() | default([]) | first | default({}) | first | default({})).extra is defined else '' }}"
)
If you want to access dataset extras independently from which dataset events triggered a DAG run, you have the option to directly provide a dataset to a task as an inlet. In a TaskFlow API task you can fetch the inlet_events
from the Airflow context, in a traditional operator you can use Jinja templating to access them.
- TaskFlow API
- Traditional syntax
# from airflow.decorators import task
# from airflow.datasets import Dataset
my_dataset_2 = Dataset("x-dataset2")
# note that my_dataset_2 does not need to be part of the DAGs schedule
# you can provide as many inlets as you wish
@task(inlets=[my_dataset_2])
def get_extra_inlet(**context):
# inlet_events are listed earliest to latest by timestamp
dataset_events = context["inlet_events"][my_dataset_2]
# protect against the dataset not existing
if len(dataset_events) == 0:
print(f"No dataset_events for {my_dataset_2.uri}")
else:
# accessing the latest dataset event for this dataset
# if the extra does not exist, return None
my_num = dataset_events[-1].extra.get("myNum", None)
print(my_num)
get_extra_inlet()
# from airflow.operators.bash import BashOperator
# from airflow.datasets import Dataset
my_dataset_2 = Dataset("x-dataset2")
get_extra_inlet_bash = BashOperator(
task_id="get_extra_inlet_bash",
inlets=[Dataset("x-dataset2")],
# This statement will error if the x-dataset2 dataset has no previous dataset events
bash_command="echo {{ inlet_events['x-dataset2'][-1].extra['myNum'] }} ",
# The below version returns an empty string if there are no triggering dataset events or the extra is not present, it errors when the dataset does not exist at all.
# bash_command="echo {{ (inlet_events['x-dataset2'] | default([]) | last | default({})).extra.get('myNum', '') if (inlet_events['x-dataset2'] | default([]) | last | default({})).extra is defined else '' }}",
)
Note that you can programmatically retrieve information from dataset aliases as well, see Fetching information from previously emitted dataset events through resolved dataset aliases for more information.
Dataset schedules
Any number of datasets can be provided to the schedule
parameter. There are 3 types of dataset schedules:
schedule=[Dataset("a"), Dataset("b")]
: Providing one or more Datasets as a list. The DAG is scheduled to run after all Datasets in the list have received at least one update.schedule=(Dataset("a") | Dataset("b"))
: (Airflow 2.9+) Using AND (&
) and OR (|
) operators to create a conditional dataset expression. Note that dataset expressions are enclosed in smooth brackets()
.DatasetOrTimeSchedule
: (Airflow 2.9+) Combining time based scheduling with dataset expressions, see combined dataset and time-based scheduling.
When scheduling DAGs based on datasets, keep the following in mind:
- Consumer DAGs that are scheduled on a dataset are triggered every time a task that updates that dataset completes successfully. For example, if
task1
andtask2
both producedataset_a
, a consumer DAG ofdataset_a
runs twice - first whentask1
completes, and again whentask2
completes. - Consumer DAGs scheduled on a dataset are triggered as soon as the first task with that dataset as an outlet finishes, even if there are downstream producer tasks that also operate on the dataset.
- Consumer DAGs scheduled on multiple datasets run as soon as their expression is fulfilled by at least one dataset event per dataset in the expression. This means that it does not matter to the consuming DAG whether a dataset received additional updates in the meantime, it consumes all queued events for one dataset as one input. See Multiple Datasets for more information.
- As of Airflow 2.10 a consumer DAG that is paused will ignore all updates to datasets that occurred while it was paused. Meaning, it starts with a blank slate upon being unpaused. In previous Airflow versions, a consumer DAG scheduled on one dataset that had received an update while the DAG was paused would run immediately when being unpaused.
- DAGs that are triggered by datasets do not have the concept of a data interval. If you need information about the triggering event in your downstream DAG, you can use the parameter
triggering_dataset_events
from the context. This parameter provides a list of all the triggering dataset events with the parameters[timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ]
. See Retrieving information in a downstream task for an example.
Conditional dataset scheduling
In Airflow 2.9 and later, you can use logical operators to combine any number of datasets provided to the schedule
parameter. The logical operators supported are |
for OR and &
for AND.
For example, to schedule a DAG on an update to either dataset1
, dataset2
, dataset3
, or dataset4
, you can use the following syntax. Note that the full statement is wrapped in ()
.
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag
from airflow.models.datasets import Dataset
from pendulum import datetime
@dag(
start_date=datetime(2024, 3, 1),
schedule=(
Dataset("dataset1")
| Dataset("dataset2")
| Dataset("dataset3")
| Dataset("dataset4")
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
)
def downstream1_on_any():
# your tasks here
downstream1_on_any()
from airflow.models import DAG
from airflow.models.datasets import Dataset
from pendulum import datetime
with DAG(
dag_id="downstream1_on_any",
start_date=datetime(2024, 3, 1),
schedule=(
Dataset("dataset1")
| Dataset("dataset2")
| Dataset("dataset3")
| Dataset("dataset4")
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
):
# your tasks here
The downstream1_on_any
DAG is triggered whenever any of the datasets dataset1
, dataset2
, dataset3
, or dataset4
are updated. When clicking on x of 4 Datasets updated in the DAGs view, you can see the dataset expression that defines the schedule.
You can also combine the logical operators to create more complex expressions. For example, to schedule a DAG on an update to either dataset1
or dataset2
and either dataset3
or dataset4
, you can use the following syntax:
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag
from airflow.models.datasets import Dataset
from pendulum import datetime
@dag(
start_date=datetime(2024, 3, 1),
schedule=(
(Dataset("dataset1") | Dataset("dataset2"))
& (Dataset("dataset3") | Dataset("dataset4"))
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False
)
def downstream2_one_in_each_group():
# your tasks here
downstream2_one_in_each_group()
from airflow.models import DAG
from airflow.models.datasets import Dataset
from pendulum import datetime
with DAG(
dag_id="downstream2_one_in_each_group",
start_date=datetime(2024, 3, 1),
schedule=(
(Dataset("dataset1") | Dataset("dataset2"))
& (Dataset("dataset3") | Dataset("dataset4"))
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
):
# your tasks here
The dataset expression this schedule creates is:
{
"all": [
{
"any": [
"dataset1",
"dataset2"
]
},
{
"any": [
"dataset3",
"dataset4"
]
}
]
}
Combined dataset and time-based scheduling
In Airflow 2.9 and later, you can combine dataset-based scheduling with time-based scheduling with the DatasetOrTimeSchedule
timetable. A DAG scheduled with this timetable will run either when its timetable
condition is met or when its dataset
condition is met.
The DAG shown below runs on a time-based schedule defined by the 0 0 * * *
cron expression, which is every day at midnight. The DAG also runs when either dataset3
or dataset4
is updated.
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from pendulum import datetime
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
start_date=datetime(2024, 3, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
datasets=(Dataset("dataset3") | Dataset("dataset4")),
# Use () instead of [] to be able to use conditional dataset scheduling!
),
catchup=False,
)
def toy_downstream3_dataset_and_time_schedule():
# your tasks here
toy_downstream3_dataset_and_time_schedule()
from airflow.models import DAG
from airflow.datasets import Dataset
from pendulum import datetime
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
with DAG(
dag_id="toy_downstream3_dataset_and_time_schedule",
start_date=datetime(2024, 3, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
datasets=(Dataset("dataset3") | Dataset("dataset4")),
# Use () instead of [] to be able to use conditional dataset scheduling!
),
catchup=False,
):
# your tasks here
Example implementation
In the following example, the write_instructions_to_file
and write_info_to_file
are both producer tasks because they have defined outlets.
- TaskFlow API
- Traditional syntax
from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
@dag(
start_date=datetime(2022, 10, 1),
schedule=None,
catchup=False,
)
def datasets_producer_dag():
@task
def get_cocktail(api):
import requests
r = requests.get(api)
return r.json()
@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
@task(outlets=[INFO])
def write_info_to_file(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
cocktail = get_cocktail(api=API)
write_instructions_to_file(cocktail)
write_info_to_file(cocktail)
datasets_producer_dag()
from pendulum import datetime
from airflow import DAG, Dataset
from airflow.decorators import task
from airflow.operators.python import PythonOperator
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
def get_cocktail_func(api):
import requests
r = requests.get(api)
return r.json()
def write_instructions_to_file_func(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
def write_info_to_file_func(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = (
f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
)
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
with DAG(
dag_id="datasets_producer_dag",
start_date=datetime(2022, 10, 1),
schedule=None,
catchup=False,
render_template_as_native_obj=True,
):
get_cocktail = PythonOperator(
task_id="get_cocktail",
python_callable=get_cocktail_func,
op_kwargs={"api": API},
)
write_instructions_to_file = PythonOperator(
task_id="write_instructions_to_file",
python_callable=write_instructions_to_file_func,
op_kwargs={"response": "{{ ti.xcom_pull(task_ids='get_cocktail') }}"},
outlets=[INSTRUCTIONS],
)
write_info_to_file = PythonOperator(
task_id="write_info_to_file",
python_callable=write_info_to_file_func,
op_kwargs={"response": "{{ ti.xcom_pull(task_ids='get_cocktail') }}"},
outlets=[INFO],
)
get_cocktail >> write_instructions_to_file >> write_info_to_file
A consumer DAG runs whenever the dataset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS
and INFO
datasets are updated, you define the DAG's schedule using the names of those two datasets.
Any DAG that is scheduled with a dataset is considered a consumer DAG even if that DAG doesn't actually access the referenced dataset. In other words, it's up to you as the DAG author to correctly reference and use datasets.
- TaskFlow API
- Traditional syntax
from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
@dag(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
catchup=False,
)
def datasets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
read_about_cocktail()
datasets_consumer_dag()
from pendulum import datetime
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
def read_about_cocktail_func():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
with DAG(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
catchup=False,
):
PythonOperator(
task_id="read_about_cocktail",
python_callable=read_about_cocktail_func,
)