Skip to main content

Create a data-aware and dynamic ETL pipeline

Apache Airflow®'s Datasets and dynamic task mapping features make it easy to incorporate data-awareness and enhanced automation into your ETL pipelines.

This tutorial is for Apache Airflow users who want to leverage some of Airflow's powerful features supporting enhanced ETL development: data-awareness and dynamic task generation. As you explore Airflow's modern ETL feature set, you'll create a complete Airflow pipeline that ingests data from an API and a local file, loads and transforms the data in an in-memory database, and visualizes the data in a dashboard. You can run and modify the pipeline in GitHub Codespaces or locally using the Astro CLI.

After you complete this tutorial, you'll be able to:

  • Create an ETL pipeline following best practices.
  • Write tasks that extract, transform, and load data from a JSON API and CSV file into an in-memory database.
  • Configure DAGs to trigger using data-aware scheduling with Airflow Datasets.
  • Use dynamic task mapping to automate parallel task creation at runtime.
  • Visualize data in a responsive dashboard.
Other ways to learn

There are multiple resources for learning about this topic. See also:

Time to complete

This tutorial takes approximately 1 hour to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • A GitHub account OR the Astro CLI, which requires a container service such as Docker Desktop (v18.09 or higher), Podman, or Colima.

Overview

Your pipeline will employ six Airflow DAGs and the following tools:

  • DuckDB, an in-memory relational database, for storing tables of the ingested data as well as the resulting tables after transformations.
  • Streamlit, a Python package for creating interactive apps, for displaying the data in a dashboard. The Streamlit app will retrieve its data from tables in the DuckDB database.

All tools used are open-source, so you will not need to create additional accounts. Also, you will not need to configure any connections to databases or warehouses.

After completing all tasks, the Streamlit dashboard will look similar to the following screenshots:

Finished Streamlit App Part 1

Finished Streamlit App Part 2

Finished Streamlit App Part 3

Step 1: Set up your project

You have two options for running the project. If you do not want to install any tools locally, you can use GitHub Codespaces. If you prefer to work locally or already have the Astro CLI installed, you can easily run the pipeline locally in Docker.

  1. Fork the ETL with Datasets repository.

  2. Create a new GitHub Codespaces project on your fork. Make sure it uses at least 4 cores!

    Fork repo and create a Codespaces project

  3. Run this command in the Codespaces terminal:

    bash ./.devcontainer/post_creation_script.sh
  4. The Astro CLI will automatically start up all necessary Airflow components as well as the Streamlit service. This can take a few minutes.

  5. Once the Airflow project has started, access the Airflow UI by clicking on the Ports tab and opening the forward URL for port 8080.

    Open Airflow UI URL Codespaces

  6. Once the Streamlit app is running, you can access the dashboard by clicking on the Ports tab and opening the forward URL for port 8501.

    Open Streamlit URL Codespaces

Step 2: Run the project

All DAGs tagged with part_1 are part of a pre-built, fully functional Airflow pipeline. To run them:

  1. Go to include/global_variables/user_input_variables.py and enter your own info for MY_NAME and MY_CITY.

  2. Trigger the start DAG and unpause all DAGs that are tagged with part_1 by clicking on the toggle on their lefthand side. Once the start DAG is unpaused, it will run once, starting the pipeline. You can also run this DAG manually to trigger further pipeline runs by clicking on the play button on the right side of the DAG.

    The DAGs that will run are:

    • start
    • extract_current_weather_data
    • in_climate_data
    • transform_climate_data.
  3. Watch the DAGs run according to their dependencies, which have been set using Datasets.

    Dataset and DAG Dependencies

  4. Open the Streamlit app. If you are using Codespaces, go to the Ports tab and open the URL of the forwarded port 8501. If you are running locally, go to localhost:8501.

  5. View the Streamlit app, now showing global climate data and the current weather for your city.

    Streamlit app

Step 3: Update the pipelines

The two DAGs tagged with part_2 are part of a partially built Airflow pipeline that handles historical weather data. Complete the following steps to finish building the pipeline.

Leverage Airflow Datasets for data-aware scheduling

Datasets allow you to schedule DAGs on the completion of tasks that create or update data assets in your pipelines. Instead of using time-based scheduling, you'll use Datasets to make:

  • extract_historical_weather_data run after completion of a task with an outlet Dataset in the start DAG.
  • transform_historical_weather run after completion of a task with an outlet Dataset in the extract_historical_weather_data DAG.
  1. In include/global_variables/user_input_variables.py, enter your own info for HOT_DAY and BIRTHYEAR if you wish.

  2. Schedule the extract_historical_weather_data DAG on the start_dataset Dataset defined in the start DAG.

    schedule=[start_dataset],
  3. Schedule the transform_historical_weather_data DAG on the extract_dataset outlet Dataset of the turn_json_into_table task, which you will find in the extract_historical_weather_data DAG. As in the previous case, the latter DAG's schedule is currently set to None, and this is the DAG parameter you need to modify.

    schedule=[Dataset("duckdb://include/dwh/historical_weather_data")],

    For more help with using Datasets, see: Datasets and data-aware scheduling in Airflow.

  4. Trigger the start DAG. You should see extract_historical_weather_data run automatically after the start DAG run completes and the transform_historical_weather DAG run automatically after extract_historical_weather_data. Once all DAGs have run, view your Streamlit app to view a graph with hot days per year and a table containing historical weather data.

    Streamlit app

Add dynamic task mapping for automated generation of parallel tasks

The tasks in the extract_historical_weather_data currently retrieve historical weather information for only one city. To retrieve information about three cities instead of just one, you can use dynamic task mapping. The dynamic task mapping feature of Airflow, based on the MapReduce programming model, automatically generates parallel individual tasks for an arbitrary number of inputs. Compared to static code, this approach offers the benefit of atomicity, improved observability, easier recovery from failures, and easier implementation.

You can find more information about dynamic task mapping in Create dynamic Airflow tasks.

Instead of manually creating a task for each additional city, you'll use an expand() function to map any number of cities in the following tasks:

  • the get_lat_long_for_city task in the extract_historical_weather_data DAG.
  • the get_historical_weather task, also in extract_historical_weather_data.
  1. Find the coordinates variable definition (just above the turn_json_into_table task definition). This line instantiates the first task you need to map: get_lat_long_for_city.

  2. Map the task using an expand() and replace the single city with a list of cities. NOTE: each item must be a city!

    coordinates = get_lat_long_for_city.expand(city=["Bern", "Basel", "Zurich"])
  3. Find the historical_weather variable definition (just below the coordinates definition). This line instantiates the second task you need to map: get_historical_weather.

  4. Map the task using expand().

    historical_weather = get_historical_weather.expand(coordinates=coordinates)

    For more guidance on implementing dynamic task mapping, see Create dynamic Airflow tasks.

  5. After completing the exercise, rerun both extract_historical_weather_data and transform_historical_weather_data.

In your Streamlit app, you can now select the different cities from the dropdown box to see how many hot days they had per year.

Streamlit app

tip

Both the in_table dataframe and the output_df dataframe are printed to the logs of the find_hottest_day_birthyear task. The goal is to have an output as in the screenshot below. If your table does not contain information for several cities, make sure you completed the dynamic task mapping correctly.

Streamlit app

Data sources

The global climate data in the local CSV file was retrieved from the Climate Change: Earth Surface Temperature Data Kaggle dataset by Berkely Earth and Kristen Sissener, which was uploaded under CC BY-NC-SA 4.0.

The current and historical weather data are queried from the Open Meteo API (CC BY 4.0).

Was this page helpful?