Skip to main content

Set up a custom XCom backend using object storage

By default, Airflow uses the metadata database to store XComs, which works well for local development but has limited performance. For production environments that use XCom to pass data between tasks, Astronomer recommends using a custom XCom backend. Custom XCom backends allow you to configure where Airflow stores information that is passed between tasks using XComs.

The Object Storage XCom Backend available in the Common IO provider is the easiest way to store XComs in a remote object storage solution.

This tutorial will show you how to set up a custom XCom backend using object storage for AWS S3, GCP Cloud Storage or Azure Blob Storage.

To learn more about other options for setting custom XCom backends, see Strategies for custom XCom backends in Airflow.

warning

While a custom XCom backend allows you to store virtually unlimited amounts of data as XComs, you will also need to scale other Airflow components to pass large amounts of data between tasks. For help running Airflow at scale, reach out to Astronomer.

warning

Object storage is currently considered experimental and might be subject to breaking changes in future releases. For more information see AIP-58.

Time to complete

This tutorial takes approximately 45 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI with an Astro project running Astro Runtime 11.5.0 or higher (Airflow 2.9.2 or higher). To set up a custom XCom backend with older versions of Airflow, see Custom XCom backends.
  • An account in either AWS, GCP, or Azure with permissions to create and configure an object storage container.

Step 1: Set up your object storage container

First, you need to set up the object storage container in your cloud provider where Airflow will store the XComs.

  1. Log into your AWS account and create a new S3 bucket. Ensure that public access to the bucket is blocked. You do not need to enable bucket versioning.

  2. Create a new IAM policy for Airflow to access your bucket. You can use the JSON configuration below or use the AWS GUI to replicate what you see in the screenshot. Replace <your-bucket-name> with the name of your S3 bucket.

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Sid": "VisualEditor0",
    "Effect": "Allow",
    "Action": [
    "s3:ReplicateObject",
    "s3:PutObject",
    "s3:GetObject",
    "s3:RestoreObject",
    "s3:ListBucket",
    "s3:DeleteObject"
    ],
    "Resource": [
    "arn:aws:s3:::<your-bucket-name>/*",
    "arn:aws:s3:::<your-bucket-name>"
    ]
    }
    ]
    }
  3. Save your policy under the name AirflowXComBackendAWSS3.

  4. Create an IAM user called airflow-xcom with the AWS credential type Access key - Programmatic access and attach the AirflowXComBackendAWSS3 policy to this user.

  5. Create an access key of the type Third-party-service for your airflow-xcom user. Make sure to save the Access Key ID and the Secret Access Key in a secure location to use in Step 3.

info

For other ways to set up a connection between Airflow and AWS, see the Amazon provider documentation.

Step 2: Install the required provider packages

To use the Object Storage XCom Backend, you need to install the Common IO provider package and the provider package for your object storage container provider.

Add the Common IO and Amazon provider packages to your requirements.txt file. Note that you need to install the s3fs extra to use the Amazon provider package with the object storage feature.

apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-amazon[s3fs]==8.19.0

Step 3: Set up your Airflow connection

An Airflow connection is necessary to connect Airflow with your object storage container provider. In this tutorial, you'll use the Airflow UI to configure your connection.

  1. Start your Astro project by running:

    astro dev start
  1. In the Airflow UI navigate to Admin -> Connections and click on Create. Fill in the following fields:

    • Conn Id: my_aws_conn
    • Conn Type: Amazon Web Services
    • AWS Access Key ID: <your access key>
    • AWS Secret Access Key: <your secret key>

    To learn more about configuration options for the AWS connection, see the Amazon provider documentation.

Step 4: Configure your custom XCom backend

Configuring a custom XCom backend with object storage can be done by setting environment variables in your Astro project.

info

If you are setting up a custom XCom backend for an Astro deployment, you have to set the following environment variables for your deployment. See Environment variables for instructions.

  1. Add the AIRFLOW__CORE__XCOM_BACKEND environment variable to your .env file. It defines the class to use for the custom XCom backend implementation.

    AIRFLOW__CORE__XCOM_BACKEND="airflow.providers.common.io.xcom.backend.XComObjectStorageBackend"
  1. Add the AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH environment variable to your .env file to define the path in your S3 bucket where the XComs will be stored in the form of <connection id>@<bucket name>/<path>. Use the connection id of the Airflow connection you defined in step 2 and replace <my-bucket> with your S3 bucket name.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="s3://my_aws_conn@<my-bucket>/xcom"
  1. Add the AIRFLOW__COMMON.IO__XCOM_OBJECTSTORAGE_THRESHOLD environment variable to your .env file to determine when Airflow will store XComs in the object storage vs the metadata database. The default value is -1 which will store all XComs in the metadata database. Set the value to 0 to store all XComs in the object storage. Any positive value means any XCom with a byte size greater than the threshold will be stored in the object storage and any XCom with a size equal to or less than the threshold will be stored in the metadata database. For this tutorial we will set the threshold to 1000 bytes, which means any XCom larger than 1KB will be stored in the object storage.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD="1000"
  2. Optional. Define the AIRFLOW__COMMON_IO__XCOM_OBJECTSTORE_COMPRESSION environment variable to compress the XComs stored in the object storage with fsspec supported compression algorithms like zip. The default value is None.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION="zip"
  3. Restart your Airflow project by running:

    astro dev restart

Step 5: Test your custom XCom backend

We will use a simple DAG to test your custom XCom backend.

  1. Create a new file in the dags directory of your Astro project called custom_xcom_backend_test.py and add the following code:

    """
    ## Toy DAG to show size dependant custom XCom serialization

    This DAG pushes two dicts to XCom, one below, one above 1000 bytes.
    It then pulls them and prints their sizes.
    """

    from airflow.decorators import dag, task
    from airflow.models.baseoperator import chain


    @dag(
    start_date=None,
    schedule=None,
    catchup=False,
    doc_md=__doc__,
    tags=["xcom", "2-9", "toy"],
    )
    def custom_xcom_backend_test():
    @task
    def push_objects(**context) -> None:
    """Create a small and a big dictionary, print their sizes and push them to XCom."""

    small_obj = {"a": 23}
    big_obj = {f"key{i}": "x" * 100 for i in range(100)}
    print(f"Size of small object: {small_obj.__sizeof__()}")
    print(f"Size of big object: {big_obj.__sizeof__()}")

    context["ti"].xcom_push(key="small_obj", value=small_obj)
    context["ti"].xcom_push(key="big_obj", value=big_obj)

    @task
    def pull_objects(**context) -> None:
    """Pull the small and big dictionaries from XCom and print their sizes."""

    small_obj = context["ti"].xcom_pull(task_ids="push_objects", key="small_obj")
    big_obj = context["ti"].xcom_pull(task_ids="push_objects", key="big_obj")

    print(f"Size of small object: {small_obj.__sizeof__()}")
    print(f"Size of big object: {big_obj.__sizeof__()}")

    chain(push_objects(), pull_objects())


    custom_xcom_backend_test()
  2. Manually trigger the custom_xcom_backend_test DAG in the Airflow UI and navigate to the XCom tab of the push_objects task. You should see that the small_obj XCom shows its value, meaning it was stored in the metadata database, since it is smaller than 1KB. The big_dict XCom shows shows the path to the object in the object storage containing the serialized value of the XCom.

    XCom tab of the push_objects task showing two key-value pairs showing the &quot;big_obj&quot; being serialized to the custom XCom backend and the &quot;small_obj&quot;: a dictionary containing&#39;a&#39;: 23, which was stored in the metadata database.

Conclusion

Congratulations, you learned how to set up a custom XCom backend using object storage! Learn more about other options to set up custom XCom backends in the Strategies for custom XCom backends in Airflow guide.

Was this page helpful?