Create a data-aware and dynamic ETL pipeline
Apache Airflow®'s Datasets and dynamic task mapping features make it easy to incorporate data-awareness and enhanced automation into your ETL pipelines.
This tutorial is for Apache Airflow users who want to leverage some of Airflow's powerful features supporting enhanced ETL development: data-awareness and dynamic task generation. As you explore Airflow's modern ETL feature set, you'll create a complete Airflow pipeline that ingests data from an API and a local file, loads and transforms the data in an in-memory database, and visualizes the data in a dashboard. You can run and modify the pipeline in GitHub Codespaces or locally using the Astro CLI.
After you complete this tutorial, you'll be able to:
- Create an ETL pipeline following best practices.
- Write tasks that extract, transform, and load data from a JSON API and CSV file into an in-memory database.
- Configure DAGs to trigger using data-aware scheduling with Airflow Datasets.
- Use dynamic task mapping to automate parallel task creation at runtime.
- Visualize data in a responsive dashboard.
There are multiple resources for learning about this topic. See also:
Time to complete
This tutorial takes approximately 1 hour to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Basic Python. See the Python Documentation.
Prerequisites
- A GitHub account OR the Astro CLI, which requires a container service such as Docker Desktop (v18.09 or higher), Podman, or Colima.
Overview
Your pipeline will employ six Airflow DAGs and the following tools:
- DuckDB, an in-memory relational database, for storing tables of the ingested data as well as the resulting tables after transformations.
- Streamlit, a Python package for creating interactive apps, for displaying the data in a dashboard. The Streamlit app will retrieve its data from tables in the DuckDB database.
All tools used are open-source, so you will not need to create additional accounts. Also, you will not need to configure any connections to databases or warehouses.
After completing all tasks, the Streamlit dashboard will look similar to the following screenshots:
Step 1: Set up your project
You have two options for running the project. If you do not want to install any tools locally, you can use GitHub Codespaces. If you prefer to work locally or already have the Astro CLI installed, you can easily run the pipeline locally in Docker.
- GitHub Codespaces
- Astro CLI
-
Fork the ETL with Datasets repository.
-
Create a new GitHub Codespaces project on your fork. Make sure it uses at least 4 cores!
-
Run this command in the Codespaces terminal:
bash ./.devcontainer/post_creation_script.sh
-
The Astro CLI will automatically start up all necessary Airflow components as well as the Streamlit service. This can take a few minutes.
-
Once the Airflow project has started, access the Airflow UI by clicking on the Ports tab and opening the forward URL for port
8080
. -
Once the Streamlit app is running, you can access the dashboard by clicking on the Ports tab and opening the forward URL for port
8501
.
Download the Astro CLI to run Airflow locally in Docker. astro
is the only package you will need to install.
-
Run this command in your terminal to create a local clone of the repository:
git clone https://github.com/astronomer/etl-datasets-tutorial.git
-
Install the Astro CLI by following the steps in the Astro CLI documentation. As mentioned, Docker Desktop/Docker Engine is a prerequisite, but you don't need in-depth Docker knowledge to run Airflow with the Astro CLI.
-
Run
astro dev start
in your cloned repository. -
After your Astro project has started, you can access the Airflow UI at
localhost:8080
. -
Access the Streamlit app at
localhost:8501
. NOTE: The Streamlit container can take a few moments to start up.
Step 2: Run the project
All DAGs tagged with part_1
are part of a pre-built, fully functional Airflow pipeline. To run them:
-
Go to
include/global_variables/user_input_variables.py
and enter your own info forMY_NAME
andMY_CITY
. -
Trigger the
start
DAG and unpause all DAGs that are tagged withpart_1
by clicking on the toggle on their lefthand side. Once thestart
DAG is unpaused, it will run once, starting the pipeline. You can also run this DAG manually to trigger further pipeline runs by clicking on the play button on the right side of the DAG.The DAGs that will run are:
start
extract_current_weather_data
in_climate_data
transform_climate_data
.
-
Watch the DAGs run according to their dependencies, which have been set using Datasets.
-
Open the Streamlit app. If you are using Codespaces, go to the Ports tab and open the URL of the forwarded port
8501
. If you are running locally, go tolocalhost:8501
. -
View the Streamlit app, now showing global climate data and the current weather for your city.
Step 3: Update the pipelines
The two DAGs tagged with part_2
are part of a partially built Airflow pipeline that handles historical weather data. Complete the following steps to finish building the pipeline.
Leverage Airflow Datasets for data-aware scheduling
Datasets allow you to schedule DAGs on the completion of tasks that create or update data assets in your pipelines. Instead of using time-based scheduling, you'll use Datasets to make:
extract_historical_weather_data
run after completion of a task with an outlet Dataset in thestart
DAG.transform_historical_weather
run after completion of a task with an outlet Dataset in theextract_historical_weather_data
DAG.
-
In
include/global_variables/user_input_variables.py
, enter your own info forHOT_DAY
andBIRTHYEAR
if you wish. -
Schedule the
extract_historical_weather_data
DAG on thestart_dataset
Dataset defined in thestart
DAG.schedule=[start_dataset],
-
Schedule the
transform_historical_weather_data
DAG on theextract_dataset
outlet Dataset of theturn_json_into_table
task, which you will find in theextract_historical_weather_data
DAG. As in the previous case, the latter DAG's schedule is currently set toNone
, and this is the DAG parameter you need to modify.schedule=[Dataset("duckdb://include/dwh/historical_weather_data")],
For more help with using Datasets, see: Datasets and data-aware scheduling in Airflow.
-
Trigger the
start
DAG. You should seeextract_historical_weather_data
run automatically after thestart
DAG run completes and thetransform_historical_weather
DAG run automatically afterextract_historical_weather_data
. Once all DAGs have run, view your Streamlit app to view a graph with hot days per year and a table containing historical weather data.
Add dynamic task mapping for automated generation of parallel tasks
The tasks in the extract_historical_weather_data
currently retrieve historical weather information for only one city. To retrieve information about three cities instead of just one, you can use dynamic task mapping. The dynamic task mapping feature of Airflow, based on the MapReduce programming model, automatically generates parallel individual tasks for an arbitrary number of inputs. Compared to static code, this approach offers the benefit of atomicity, improved observability, easier recovery from failures, and easier implementation.
You can find more information about dynamic task mapping in Create dynamic Airflow tasks.
Instead of manually creating a task for each additional city, you'll use an expand()
function to map any number of cities in the following tasks:
- the
get_lat_long_for_city
task in theextract_historical_weather_data
DAG. - the
get_historical_weather
task, also inextract_historical_weather_data
.
-
Find the
coordinates
variable definition (just above theturn_json_into_table
task definition). This line instantiates the first task you need to map:get_lat_long_for_city
. -
Map the task using an
expand()
and replace the single city with a list of cities. NOTE: each item must be a city!coordinates = get_lat_long_for_city.expand(city=["Bern", "Basel", "Zurich"])
-
Find the
historical_weather
variable definition (just below thecoordinates
definition). This line instantiates the second task you need to map:get_historical_weather
. -
Map the task using
expand()
.historical_weather = get_historical_weather.expand(coordinates=coordinates)
For more guidance on implementing dynamic task mapping, see Create dynamic Airflow tasks.
-
After completing the exercise, rerun both
extract_historical_weather_data
andtransform_historical_weather_data
.
In your Streamlit app, you can now select the different cities from the dropdown box to see how many hot days they had per year.
Both the in_table
dataframe and the output_df
dataframe are printed to the logs of the find_hottest_day_birthyear
task. The goal is to have an output as in the screenshot below. If your table does not contain information for several cities, make sure you completed the dynamic task mapping correctly.
Data sources
The global climate data in the local CSV file was retrieved from the Climate Change: Earth Surface Temperature Data Kaggle dataset by Berkely Earth and Kristen Sissener, which was uploaded under CC BY-NC-SA 4.0.
The current and historical weather data are queried from the Open Meteo API (CC BY 4.0).