Cross-deployment dependencies
Cross-DAG dependencies serve a common use case: configuring a DAG to run when a separate DAG or a task in another DAG completes or updates a dataset. But what about situations in which a dataset or DAG you monitor exists in a separate deployment? For example, you might want to make a task dependent on a dataset update in a DAG that is owned by a different team and located in a separate deployment. Astro also supports the orchestration of tasks using this kind of relationship, which is referred to as a cross-deployment dependency.
This guide uses the following terms to describe cross-deployment dependencies:
- Upstream deployment: A deployment where a DAG must reach a specified state before a DAG in another deployment can run.
- Downstream deployment: A deployment in which a DAG cannot run until a DAG in an upstream deployment reaches a specified state.
Cross-deployment dependencies require special implementation because some methods, like the TriggerDagRunOperator, ExternalTaskSensor, and direct Airflow Dataset dependencies, are only designed for DAGs in the same deployment. On Astro, there are two recommended methods available for implementing cross-deployment dependencies: Astro Alerts and triggering updates to Airflow Datasets using the Airflow REST API.
Feature overview
In this guide, you'll learn when to use the following Astro and Airflow features to create dependencies across Astro deployments. Astro supports cross-deployment dependencies in any Workspace or cluster.
- Astro Alerts. Recommended for most Astro use cases as no code modification is required.
- Airflow Datasets. Can be used to trigger a DAG after a task in another DAG updates a dataset.
Best practice guidance
Astro Alerts and Airflow Datasets are the best methods for implementing cross-deployment dependencies. The dagRuns
endpoint of the Airflow API can also be used for this purpose and might be appropriate in cases where you want tasks that do not update datasets to trigger DAGs. This method will not be covered here, but you can implement it by following the guidance in Airflow REST API.
To determine whether an Astro Alert or the Datasets feature is the right solution for your use case, consider the following guidance.
Astro Alerts:
You can use Astro alerts to implement cross-deployment DAG dependencies using the DAG trigger communication channel. They are simple to implement and are the preferred method in the following situations:
- If you need to implement a dependency trigger based on any DAG state other than success, such as a DAG failure, a task taking longer than expected, or a DAG not completing by a certain time.
- If you need to implement a simple one-to-one cross-deployment dependency (one upstream DAG triggers one downstream DAG) and do not want to update your DAG code.
- When your DAGs do not already use the Datasets feature and when it is easy to identify the relevant dependent DAGs, which is not always the case in larger organizations.
Airflow Datasets:
Datasets represent a significant evolution in the way Airflow can be used to define dependencies and, for some, offer a more natural way of expressing pipelines than traditional DAGs. Datasets, which offer more flexibility for cross-deployment dependencies than Astro alerts, are the preferred method in the following scenarios:
- You need to implement dependencies in a many-to-one pattern, so you can make a DAG dependent on the completion of multiple other DAGs or tasks. This is not possible using Astro Alerts.
- You need to implement dependencies in a one-to-many pattern, so one DAG or task triggers multiple DAGs or tasks. While this is also possible using Astro Alerts, it requires a separate alert for each dependency.
Before Airflow 2.9, you could only create updates to Airflow Datasets from within tasks located in the same deployment as the Dataset. As of Airflow 2.9, you can update a dataset with a POST
request to the datasets endpoint of the Airflow REST API, which supports implementation across deployments.
Astro alerts example
Assumed knowledge
To use Astro Alerts to create cross-deployment dependencies, you should have an understanding of:
- Airflow DAGs. See Introduction to Airflow DAGs.
- Creating and managing Astro deployments. See Create a deployment.
Implementation
This example shows you how to create dependencies between DAGs in different Astro deployments using Astro Alerts.
Prerequisites
- Two Astro Deployments, each containing at least one DAG.
Process
Create a dependency between DAGs in separate deployments with an alert trigger on Astro.
- First, create a Deployment API token for the upstream Deployment.
- Click Alerts in the Workspace menu, and create a new alert.
- Enter an Alert name that you'll remember, select the Alert type (like DAG Success), and then select DAG Trigger as the Communication Channel.
- In the Deployment menu for the DAG Trigger communication channel, select the downstream deployment from the list.
- In the DAG NAME menu, select the DAG you want to trigger.
- Paste your API token in the DEPLOYMENT API TOKEN field.
- Run the upstream DAG, verify that the alert triggers, and confirm the downstream DAG runs as expected.
Datasets example
Assumed knowledge
To use Airflow Datasets to create cross-deployment dependencies, you should have an understanding of:
- Airflow DAGs. See Introduction to Airflow DAGs.
- Airflow Datasets.
- The Airflow API.
Implementation
This section explains how to use the Airflow API's Datasets endpoint to trigger the downstream DAG in another deployment when a dataset is updated. Typical dataset implementation only works for DAGs in the same Airflow deployment, but by using the Airflow API, you can implement this pattern across deployments.
Prerequisites
- Two Astro Deployments.
- A Deployment API token, Workspace API token, or Organization API token for one of your deployments. This deployment will host your downstream DAG.
- Two Astro projects.
Process
- In your upstream Deployment, which is the Deployment for which you did not create an API Token, in the Deployment's Environment Variables tab in your Deployment's Environment settings, create an environment variable for your API token and use
API_TOKEN
for the key. - For your downstream Deployment, follow the guidance in Make requests to the Airflow REST API - Step 2 to obtain the Deployment URL for your downstream Deployment. The Deployment URL should be in the format of
clq52ag32000108i8e3v3acml.astronomer.run/dz3uu847
. - In your upstream Deployment, use Variables in the Astro UI to create an environment variable where you can store your downstream Deployment URL, using
DEPLOYMENT_URL
for the key. - In the upstream Deployment, add the following DAG to your Astro project running in the upstream Deployment. In the
get_bear
task, the TaskFlow API automatically registersMY_DATASET
as an outlet Dataset. This creates an update to this Dataset in the same Airflow deployment. The dependentupdate_dataset_via_api
task creates or updates the Dataset via a request to the Airflow API Datasets endpoint in a different Airflow deployment.
from airflow.datasets import Dataset
from airflow.decorators import dag, task
from pendulum import datetime
import os
URI = "file://include/bears"
MY_DATASET = Dataset(URI)
TOKEN = os.environ.get("API_TOKEN")
DEPLOYMENT_URL = os.environ.get("DEPLOYMENT_URL")
@dag(
start_date=datetime(2023, 12, 1),
schedule="0 0 * * 0",
catchup=False,
doc_md=__doc__,
)
def producer_dag():
@task
def get_bear():
print("Update the bears dataset")
return MY_DATASET
@task
def update_dataset_via_api(dataset: Dataset):
print('Oh! This is the bears dataset!')
payload = {
'dataset_uri': dataset.uri
}
response = requests.post(
url=f'https://{DEPLOYMENT_URL}/api/v1/datasets/events',
headers={
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=payload
)
print(response.json())
bear = get_bear()
update_dataset_via_api(bear)
producer_dag()
- Deploy the project to Astro.
- Deploy a DAG of any kind to your downstream Deployment. In this DAG, use a
dag_id
ofconsumer_dag
and schedule it on the same dataset in theproducer_dag
.
For example:
from airflow.decorators import dag, task
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from airflow.datasets import Dataset
URI = "file://include/bears"
@dag(
dag_id="consumer_dag",
start_date=datetime(2023, 12, 1),
schedule=[Dataset(URI)],
catchup=False,
doc_md=__doc__,
)
def consumer_dag():
@task
def wait_for_bears():
print("The bears are here!")
wait_for_bears()
consumer_dag()
After deploying both projects to their respective Deployments on Astro, you should see runs of your producer_dag
trigger your consumer_dag
automatically. If the downstream DAG is not firing, verify that the upstream Deployment's environment variables, the API token and deployment URL, correspond to the downstream Deployment's API Token and Deployment URL. Also, a successful request to the Airflow API will emit a payload containing DAG run information related to the dataset. Check the task logs for output that looks similar to this:
[2024-05-13, 11:21:06 UTC] {logging_mixin.py:188} INFO - {'created_dagruns': [], 'dataset_id': 1, 'dataset_uri': 'file://include/bears', 'extra': {'from_rest_api': True}, 'id': 4, 'source_dag_id': None, 'source_map_index': -1, 'source_run_id': None, 'source_task_id': None, 'timestamp': '2024-05-13T11:21:06.252976+00:00'}
See also
- Creating cross-DAG dependencies.
- Setting up alerts.