Set up a custom XCom backend using object storage
By default, Airflow uses the metadata database to store XComs, which works well for local development but has limited performance. For production environments that use XCom to pass data between tasks, Astronomer recommends using a custom XCom backend. Custom XCom backends allow you to configure where Airflow stores information that is passed between tasks using XComs.
The Object Storage XCom Backend available in the Common IO provider is the easiest way to store XComs in a remote object storage solution.
This tutorial will show you how to set up a custom XCom backend using object storage for AWS S3, GCP Cloud Storage or Azure Blob Storage.
To learn more about other options for setting custom XCom backends, see Strategies for custom XCom backends in Airflow.
While a custom XCom backend allows you to store virtually unlimited amounts of data as XComs, you will also need to scale other Airflow components to pass large amounts of data between tasks. For help running Airflow at scale, reach out to Astronomer.
Object storage is currently considered experimental and might be subject to breaking changes in future releases. For more information see AIP-58.
Time to complete
This tutorial takes approximately 45 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- XCom basics. See Passing data between Airflow tasks.
- Airflow connections. See Manage connections in Apache Airflow.
Prerequisites
- The Astro CLI with an Astro project running Astro Runtime 11.5.0 or higher (Airflow 2.9.2 or higher). To set up a custom XCom backend with older versions of Airflow, see Custom XCom backends.
- An account in either AWS, GCP, or Azure with permissions to create and configure an object storage container.
Step 1: Set up your object storage container
First, you need to set up the object storage container in your cloud provider where Airflow will store the XComs.
- AWS S3
- GCP Cloud Storage
- Azure Blob Storage
-
Log into your AWS account and create a new S3 bucket. Ensure that public access to the bucket is blocked. You do not need to enable bucket versioning.
-
Create a new IAM policy for Airflow to access your bucket. You can use the JSON configuration below or use the AWS GUI to replicate what you see in the screenshot. Replace
<your-bucket-name>
with the name of your S3 bucket.{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:ReplicateObject",
"s3:PutObject",
"s3:GetObject",
"s3:RestoreObject",
"s3:ListBucket",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<your-bucket-name>/*",
"arn:aws:s3:::<your-bucket-name>"
]
}
]
} -
Save your policy under the name
AirflowXComBackendAWSS3
. -
Create an IAM user called
airflow-xcom
with the AWS credential typeAccess key - Programmatic access
and attach theAirflowXComBackendAWSS3
policy to this user. -
Create an access key of the type
Third-party-service
for yourairflow-xcom
user. Make sure to save the Access Key ID and the Secret Access Key in a secure location to use in Step 3.
For other ways to set up a connection between Airflow and AWS, see the Amazon provider documentation.
-
Log into your Google Cloud account and create a new project.
-
Create a new bucket in your project with Uniform Access Control. Enforce public access prevention.
-
Create a custom IAM role called
AirflowXComBackendGCS
for Airflow to access your bucket. Assign 6 permissions:- storage.buckets.list
- storage.objects.create
- storage.objects.delete
- storage.objects.get
- storage.objects.list
- storage.objects.update
-
Create a new service account called
airflow-xcom
and grant it access to your project by granting it theAirflowXComBackendGCS
role. -
Create a new key for your
airflow-xcom
service account and make sure to download the credentials in JSON format.
For other ways to set up a connection between Airflow and Google Cloud, see the Google provider documentation.
-
Log into your Azure account and create a storage account. Ensure that public access to the bucket is blocked.
-
In the storage account, create a new container.
-
Create a shared access token for your container.
In the Permissions dropdown menu, enable the following permissions:
- Read
- Add
- Create
- Write
- Delete
- List
Set the duration the token will be valid and set Allowed Protocols to
HTTPS only
. Provide the IP address of your Airflow instance. If you are running Airflow locally with the Astro CLI, use the IP address of your computer. -
Go to your Storage account and navigate to Access keys. Copy the Key and Connection string values and save them in a secure location to use in step 3.
For other ways to set up a connection between Airflow and Azure Blob Storage, see the Microsoft Azure provider documentation.
Step 2: Install the required provider packages
To use the Object Storage XCom Backend, you need to install the Common IO provider package and the provider package for your object storage container provider.
- AWS S3
- GCP Cloud Storage
- Azure Blob Storage
Add the Common IO and Amazon provider packages to your requirements.txt
file. Note that you need to install the s3fs
extra to use the Amazon provider package with the object storage feature.
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-amazon[s3fs]==8.19.0
Add the Common IO and Google provider packages to your requirements.txt
file.
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-google==10.17.0
Add the Common IO and Microsoft Azure provider packages to your requirements.txt
file.
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-microsoft-azure==9.0.0
Step 3: Set up your Airflow connection
An Airflow connection is necessary to connect Airflow with your object storage container provider. In this tutorial, you'll use the Airflow UI to configure your connection.
-
Start your Astro project by running:
astro dev start
- AWS S3
- GCP Cloud Storage
- Azure Blob Storage
-
In the Airflow UI navigate to
Admin
->Connections
and click onCreate
. Fill in the following fields:- Conn Id:
my_aws_conn
- Conn Type:
Amazon Web Services
- AWS Access Key ID:
<your access key>
- AWS Secret Access Key:
<your secret key>
To learn more about configuration options for the AWS connection, see the Amazon provider documentation.
- Conn Id:
-
In the Airflow UI navigate to
Admin
->Connections
and click onCreate
. Fill in the following fields:- Conn Id:
my_gcp_conn
- Conn Type:
Google Cloud
- Project Id:
<your project id>
- Keyfile JSON:
<the contents from your keyfile JSON that you downloaded in step 1>
To learn more about configuration options for the Google connection, see the Google provider documentation.
- Conn Id:
-
In the Airflow UI navigate to
Admin
->Connections
and click onCreate
. Fill in the following fields:- Conn Id:
my_azure_conn
- Conn Type:
Microsoft Azure Blob Storage
- Account URL (Active Directory Auth):
<the URL of your Azure Storage account>
- Blob Storage Key (optional):
<access key to your Azure Storage account>
- Blob Storage Connection String (optional):
<connection string to your Azure Storage account>
To learn more about configuration options for the Azure connection, see the Microsoft Azure provider documentation.
- Conn Id:
Step 4: Configure your custom XCom backend
Configuring a custom XCom backend with object storage can be done by setting environment variables in your Astro project.
If you are setting up a custom XCom backend for an Astro deployment, you have to set the following environment variables for your deployment. See Environment variables for instructions.
-
Add the
AIRFLOW__CORE__XCOM_BACKEND
environment variable to your.env
file. It defines the class to use for the custom XCom backend implementation.AIRFLOW__CORE__XCOM_BACKEND="airflow.providers.common.io.xcom.backend.XComObjectStorageBackend"
- AWS S3
- GCP Cloud Storage
- Azure Blob Storage
-
Add the
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH
environment variable to your.env
file to define the path in your S3 bucket where the XComs will be stored in the form of<connection id>@<bucket name>/<path>
. Use the connection id of the Airflow connection you defined in step 2 and replace<my-bucket>
with your S3 bucket name.AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="s3://my_aws_conn@<my-bucket>/xcom"
-
Add the
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH
environment variable to your.env
file to define the path in your GCS bucket where the XComs will be stored in the form of<connection id>@<bucket name>/<path>
. Use the connection id of the Airflow connection you defined in step 2 and replace<my-bucket>
with your GCS bucket name.AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="gs://my_gcp_conn@<my-bucket>/xcom"
-
Add the
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH
environment variable to your.env
file to define the path in your GCS bucket where the XComs will be stored in the form of<connection id>@<blob name>/<path>
. Use the connection id of the Airflow connection you defined in step 2 and replace<my-blob>
with your GCS bucket name.AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="azblob://my_azure_conn@<my-blob>/xcom"
-
Add the
AIRFLOW__COMMON.IO__XCOM_OBJECTSTORAGE_THRESHOLD
environment variable to your.env
file to determine when Airflow will store XComs in the object storage vs the metadata database. The default value is-1
which will store all XComs in the metadata database. Set the value to0
to store all XComs in the object storage. Any positive value means any XCom with a byte size greater than the threshold will be stored in the object storage and any XCom with a size equal to or less than the threshold will be stored in the metadata database. For this tutorial we will set the threshold to1000
bytes, which means any XCom larger than 1KB will be stored in the object storage.AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD="1000"
-
Optional. Define the
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORE_COMPRESSION
environment variable to compress the XComs stored in the object storage with fsspec supported compression algorithms likezip
. The default value isNone
.AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION="zip"
-
Restart your Airflow project by running:
astro dev restart
Step 5: Test your custom XCom backend
We will use a simple DAG to test your custom XCom backend.
-
Create a new file in the
dags
directory of your Astro project calledcustom_xcom_backend_test.py
and add the following code:"""
## Toy DAG to show size dependant custom XCom serialization
This DAG pushes two dicts to XCom, one below, one above 1000 bytes.
It then pulls them and prints their sizes.
"""
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
@dag(
start_date=None,
schedule=None,
catchup=False,
doc_md=__doc__,
tags=["xcom", "2-9", "toy"],
)
def custom_xcom_backend_test():
@task
def push_objects(**context) -> None:
"""Create a small and a big dictionary, print their sizes and push them to XCom."""
small_obj = {"a": 23}
big_obj = {f"key{i}": "x" * 100 for i in range(100)}
print(f"Size of small object: {small_obj.__sizeof__()}")
print(f"Size of big object: {big_obj.__sizeof__()}")
context["ti"].xcom_push(key="small_obj", value=small_obj)
context["ti"].xcom_push(key="big_obj", value=big_obj)
@task
def pull_objects(**context) -> None:
"""Pull the small and big dictionaries from XCom and print their sizes."""
small_obj = context["ti"].xcom_pull(task_ids="push_objects", key="small_obj")
big_obj = context["ti"].xcom_pull(task_ids="push_objects", key="big_obj")
print(f"Size of small object: {small_obj.__sizeof__()}")
print(f"Size of big object: {big_obj.__sizeof__()}")
chain(push_objects(), pull_objects())
custom_xcom_backend_test() -
Manually trigger the
custom_xcom_backend_test
DAG in the Airflow UI and navigate to the XCom tab of thepush_objects
task. You should see that thesmall_obj
XCom shows its value, meaning it was stored in the metadata database, since it is smaller than 1KB. Thebig_dict
XCom shows shows the path to the object in the object storage containing the serialized value of the XCom.
Conclusion
Congratulations, you learned how to set up a custom XCom backend using object storage! Learn more about other options to set up custom XCom backends in the Strategies for custom XCom backends in Airflow guide.