Skip to main content

Clean up the Airflow metadata database using DAGs

In addition to storing configurations about your Airflow environment, the Airflow metadata database stores data about past and present task runs. Airflow never automatically removes metadata, so the longer you use it, the more task run data is stored in your metadata DB. Over a long enough time, this can result in a bloated metadata DB, which can affect performance across your Airflow environment.

When a table in the metadata DB is larger than 50GiB, you might start to experience degraded scheduler performance. This can result in:

  • Slow task scheduling
  • Slow DAG parsing
  • Gunicorn timing out when using the Celery executor
  • Slower Airflow UI load times

The following tables in the database are at risk of becoming too large over time:

  • dag_run
  • job
  • log
  • rendered_task_instance_fields
  • task_instance
  • xcom

To keep your Airflow environment running at optimal performance, you can clean the metadata DB using the Airflow CLI airflow db clean command. This command was created as a way to safely clean up your metadata DB without querying it directly. This tutorial describes how to implement a cleanup DAG in Airflow so that you can clean your database using the command directly from the Airflow UI.

danger

Even when using airflow db clean, deleting data from the metadata database can destroy important data. Read the Warnings section carefully before implementing this tutorial DAG in any production Airflow environment.

Warnings

Deleting data from the metadata database can be an extremely destructive action. If you delete data that future task runs depend on, it's difficult to restore the database to its previous state without interrupting your data pipelines. Before implementing the DAG in this tutorial, consider the following:

  • When specifying the clean_before_timestamp value, use as old a date as possible. The older the deleted data, the less likely it is to affect your currently running DAGs.
  • The DAG drops the archived tables it created in the cleanup process by default using the --skip-archive option and does not maintain any history. If the task fails for example if it runs for longer than five minutes, the archive tables are not cleared. By running airflow db drop-archived in the second task of the DAG, we ensure all archive tables are dropped even in the event of the first task failing.

Prerequisites

  • An Airflow project project
  • The Astro CLI

Step 1: Create your DAG

warning

This DAG has been designed and optimized for Airflow environments running on Astro. Consider adjusting the parameters and code if you're running the DAG in any other type of Airflow environment.

  1. In your dags folder, create a file called db_cleanup.py.

  2. Copy the following code into the file.

    """A DB Cleanup DAG maintained by Astronomer."""

    import logging
    from datetime import UTC, datetime, timedelta
    from typing import Optional, List

    from sqlalchemy import func
    from sqlalchemy.orm import Session

    from airflow.cli.commands.db_command import all_tables
    from airflow.decorators import dag, task
    from airflow.models.param import Param
    from airflow.operators.bash import BashOperator
    from airflow.utils.db import reflect_tables
    from airflow.utils.db_cleanup import _effective_table_names
    from airflow.utils.session import provide_session, NEW_SESSION


    @dag(
    dag_id="astronomer_db_cleanup_dag",
    schedule_interval=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
    is_paused_upon_creation=False,
    description=__doc__,
    doc_md=__doc__,
    render_template_as_native_obj=True,
    max_active_tasks=1,
    tags=["cleanup"],
    params={
    "clean_before_timestamp": Param(
    default=datetime.now(tz=UTC) - timedelta(days=90),
    type="string",
    format="date-time",
    description="Delete records older than this timestamp. Default is 90 days ago.",
    ),
    "tables": Param(
    default=[],
    type=["null", "array"],
    examples=all_tables,
    description="List of tables to clean. Default is all tables.",
    ),
    "dry_run": Param(
    default=False,
    type="boolean",
    description="Print the SQL queries that would be run, but do not execute them. Default is False.",
    ),
    "batch_size_days": Param(
    default=7,
    type="integer",
    description="Number of days in each batch for the cleanup. Default is 7 days.",
    ),
    },
    )
    def astronomer_db_cleanup_dag():

    @provide_session
    def get_oldest_timestamp(
    tables,
    session: Session = NEW_SESSION,
    ) -> Optional[str]:
    oldest_timestamp_list = []
    existing_tables = reflect_tables(tables=None, session=session).tables
    _, effective_config_dict = _effective_table_names(table_names=tables)
    for table_name, table_config in effective_config_dict.items():
    if table_name in existing_tables:
    orm_model = table_config.orm_model
    recency_column = table_config.recency_column
    oldest_execution_date = (
    session.query(func.min(recency_column))
    .select_from(orm_model)
    .scalar()
    )
    if oldest_execution_date:
    oldest_timestamp_list.append(oldest_execution_date.isoformat())
    else:
    logging.info(f"No data found for {table_name}, skipping...")
    else:
    logging.warning(f"Table {table_name} not found. Skipping.")

    if oldest_timestamp_list:
    return min(oldest_timestamp_list)

    @task
    def get_chunked_timestamps(params) -> List:
    batches = []
    start_chunk_time = get_oldest_timestamp(params["tables"])
    if start_chunk_time:
    start_ts = datetime.fromisoformat(start_chunk_time)
    end_ts = datetime.fromisoformat(params["clean_before_timestamp"])
    batch_size_days = params["batch_size_days"]

    while start_ts < end_ts:
    batch_end = min(start_ts + timedelta(days=batch_size_days), end_ts)
    batches.append({"BATCH_TS": batch_end.isoformat()})
    start_ts += timedelta(days=batch_size_days)
    return batches

    # The "clean_archive_tables" task drops archived tables created by the previous "clean_db" task, in case that task fails due to an error or timeout.
    db_archive_cleanup = BashOperator(
    task_id="clean_archive_tables",
    bash_command="""\
    airflow db drop-archived \
    {% if params.tables -%}
    --tables {{ params.tables|join(',') }} \
    {% endif -%}
    --yes \
    """,
    do_xcom_push=False,
    trigger_rule="all_done",
    )

    chunked_timestamps = get_chunked_timestamps()

    (
    BashOperator.partial(
    task_id="db_cleanup",
    bash_command="""\
    airflow db clean \
    --clean-before-timestamp $BATCH_TS \
    {% if params.dry_run -%}
    --dry-run \
    {% endif -%}
    --skip-archive \
    {% if params.tables -%}
    --tables '{{ params.tables|join(',') }}' \
    {% endif -%}
    --verbose \
    --yes \
    """,
    append_env=True,
    do_xcom_push=False,
    ).expand(env=chunked_timestamps)
    >> db_archive_cleanup
    )


    astronomer_db_cleanup_dag()

    Rather than running on a schedule, this DAG is triggered manually by default and includes params so that you're in full control over how you clean the metadata DB.

    It includes two tasks:

    • clean_db, that runs airflow db clean
    • clean_archive_tables, that runs airflow db drop-archived

    These two tasks run with params you specify at runtime. The params let you specify:

    • What age of data to delete. Any data that was created before the specified time will be deleted. The default is to delete all data older than 90 days.
    • Whether to run the cleanup as a dry run, meaning that no data is deleted. The DAG will instead return the SQL that would be executed based on other parameters you have specified. The default is to run the deletion without a dry run.
    • Which tables to delete data from. The default is all tables.

Step 2: Run the DAG

In this step, run the DAG in a local Airflow environment to practice the workflow for cleaning metadata DB data. When completing this process in a production environment, you would complete this process only after your Airflow environment has been running for a while.

  1. Run astro dev start in your Astro project to start Airflow, then open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the db_cleanup DAG by clicking the play button, then click Trigger DAG w/ Config. Configure the following params:

    • dry_run: true
    • tables: all_tables
    • clean_before_timestamp: datetime.now(tz=UTC) - timedelta(days=90)
  3. Click Trigger.

  4. After the task completes, click on the Graph tab.

  5. Click on the clean_db task.

  6. Click on the Logs tab.

  7. Check that the airflow db cleanup command completed successfully. Note that if you created a new Astro project for this tutorial, the run will not show much data to be deleted.

You can now use this DAG to periodically clean data from the Airflow metadata DB as needed.

Was this page helpful?