Write a DAG with the Astro Python SDK
The Astro Python SDK is no longer maintained and may not work or be up to date with newer versions of Airflow.
This tutorial demonstrates how to write an Extract, Transform, Load (ETL) pipeline on your local machine with the Astro Python SDK. The Astro SDK is maintained by Astronomer and simplifies the pipeline authoring process with native Python functions for common data orchestration use cases.
The pipeline you build in this tutorial will:
- Extract data from an S3 bucket and load it into a Snowflake relational table.
- Transform that table.
- Load the transformed table into a reporting table.
The example DAG in this tutorial uses Amazon S3 and Snowflake, but you can replace these with any supported data sources and tables simply by changing connection information.
This guide is based on the latest version of the Astro SDK. If you're using an earlier version of the Astro SDK, you might need to modify the configuration steps or code. See the Astro Python SDK Changelog for a complete list of changes in each version.
There are multiple resources for learning about this topic. See also:
- Astronomer Academy: Astro: SDK module.
- Webinar: Simplified DAG authoring with the Astro SDK.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- Basic Python and SQL.
- Basic knowledge of Amazon S3 and Snowflake.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
Prerequisites
To complete this tutorial, you need:
- An AWS S3 account with a storage bucket. If you don't already have an account, Amazon offers 5GB of free storage in S3 for 12 months. This should be more than enough for this tutorial.
- A Snowflake Enterprise account. If you don't already have an account, Snowflake has a free Snowflake trial for 30 days.
- The Astro CLI.
Step 1: Set up your data stores
For this example, you first need to load a file into S3 and create a destination for the data in Snowflake. In this case, the data has three example orders with distinct customers, purchase dates, and amounts.
-
On your local machine create a file named
orders_data_header.csv
with the following data:order_id,customer_id,purchase_date,amount
ORDER1,CUST1,1/1/2021,100
ORDER2,CUST2,2/2/2022,200
ORDER3,CUST3,3/3/2023,300 -
Upload
orders_data_header.csv
to your S3 bucket. -
In Snowflake, create a new worksheet and run the following SQL commands:
CREATE DATABASE ASTRO_SDK_DB;
CREATE WAREHOUSE ASTRO_SDK_DW;
CREATE SCHEMA ASTRO_SDK_SCHEMA;These commands create all of the required data stores for the load step of your pipeline. Ensure that you have
ACCOUNTADMIN
permissions for your newly created database.
Step 2: Set up your Airflow environment
Now that you have a staging table in Snowflake and some example data ready to load, you need to set up a local Airflow environment for your DAG to run in. This is easy with the Astro CLI.
-
Create a new Astro project:
$ mkdir astro-sdk-tutorial && cd astro-sdk-tutorial
$ astro dev init -
Add the following line to the
requirements.txt
file of your Astro project:astro-sdk-python[amazon,snowflake]>=0.11
This installs the SDK package as well as the Snowflake and Amazon providers, which are required for any DAG that loads data to or from Amazon S3 and Snowflake. If you build this pipeline with different database services, you'll have to modify this line.
-
Set the following environment variables to configure your database to store temporary tables generated by the SDK. If you're using the Astro CLI, add this environment variable to the
.env
file of your Astro project to allow Airflow's XCom backend to handle serialization and deserialization of custom constructs of the SDK:AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES = airflow\.* astro\.*
If you're using Airflow 2.4 or earlier, set the following environment variables instead to use a required custom XCom backend:
AIRFLOW__ASTRO_SDK__XCOM_STORAGE_CONN_ID=<your_aws_conn>
AIRFLOW__ASTRO_SDK__XCOM_STORAGE_URL='s3://<your-bucket>/xcom/'
AIRFLOW__CORE__XCOM_BACKEND='astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend'Learn more about how the Astro SDK uses XCom backends using the Astro SDK documentation.
-
Run the following command to start your project in a local environment:
astro dev start
Step 3: Create Airflow connections to S3 and Snowflake
-
Open the Airflow UI at
http://localhost:8080/
-
Go to Admin > Connections.
-
Create a new connection to AWS S3 with the following values:
- Connection ID:
aws_default
- Connection type:
Amazon Web Services
- AWS Access Key ID:
<your_access_key>
- AWS Secret Access Key:
<you_secret_access_key>
- Connection ID:
-
Create a new connection to Snowflake with the following values
- Connection Id:
snowflake_default
- Connection Type:
Snowflake
- Schema:
ASTRO_SDK_SCHEMA
- Login:
<your_login>
- Password:
<your_password>
- Account:
<your_account>
. This is the account name found in the URL where you can log into Snowflake, it has the format xy12345. - Warehouse:
ASTRO_SDK_DW
- Database:
ASTRO_SDK_DB
- Region:
<your_region>
(for example,us-east-1
orus-central1.gcp
) - Role:
ACCOUNTADMIN
- Connection Id:
Step 4: Create and populate some tables in Snowflake
Create some auxiliary tables in Snowflake and populate them with a small amount of data for this ETL example.
-
In your Snowflake worksheet, create and populate a
customers_table
. You'll join this table with an orders table that you create with the Astro Python SDK:CREATE OR REPLACE TABLE customers_table (
customer_id CHAR(10), customer_name VARCHAR(100), type VARCHAR(10)
);
INSERT INTO customers_table (CUSTOMER_ID, CUSTOMER_NAME,TYPE)
VALUES ('CUST1','NAME1','TYPE1'),('CUST2','NAME2','TYPE1'),('CUST3','NAME3','TYPE2'); -
Create and populate a reporting table. This is where you'll merge your transformed data:
CREATE OR REPLACE TABLE reporting_table (
CUSTOMER_ID CHAR(30), CUSTOMER_NAME VARCHAR(100), ORDER_ID CHAR(10),
PURCHASE_DATE DATE, AMOUNT FLOAT, TYPE CHAR(10)
);
INSERT INTO reporting_table (CUSTOMER_ID, CUSTOMER_NAME, ORDER_ID, PURCHASE_DATE, AMOUNT, TYPE) VALUES
('INCORRECT_CUSTOMER_ID','INCORRECT_CUSTOMER_NAME','ORDER2','2/2/2022',200,'TYPE1'),
('CUST3','NAME3','ORDER3','3/3/2023',300,'TYPE2'),
('CUST4','NAME4','ORDER4','4/4/2022',400,'TYPE2');
Step 5: Write a DAG for a simple ETL workflow
Use your favorite code editor or text editor to copy-paste the following code into a .py
file in your project's dags
directory:
from datetime import datetime
from airflow.models import DAG
from pandas import DataFrame
# Import decorators and classes from the SDK
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table
# Import SQLAlchemy to set constraints on some temporary tables
import sqlalchemy
# Define constants for interacting with external systems
S3_FILE_PATH = "s3://<aws-bucket-name>"
S3_CONN_ID = "aws_default"
SNOWFLAKE_CONN_ID = "snowflake_default"
SNOWFLAKE_ORDERS = "orders_table"
SNOWFLAKE_CUSTOMERS = "customers_table"
SNOWFLAKE_REPORTING = "reporting_table"
# Define an SQL query for our transform step as a Python function using the SDK.
# This function filters out all rows with an amount value less than 150.
@aql.transform
def filter_orders(input_table: Table):
return "SELECT * FROM {{input_table}} WHERE amount > 150"
# Define an SQL query for our transform step as a Python function using the SDK.
# This function joins two tables into a new table.
@aql.transform
def join_orders_customers(filtered_orders_table: Table, customers_table: Table):
return """SELECT c.customer_id, customer_name, order_id, purchase_date, amount, type
FROM {{filtered_orders_table}} f JOIN {{customers_table}} c
ON f.customer_id = c.customer_id"""
# Define a function for transforming tables to dataframes
@aql.dataframe
def transform_dataframe(df: DataFrame):
purchase_dates = df.loc[:, "purchase_date"]
print("purchase dates:", purchase_dates)
return DataFrame(purchase_dates)
# Basic DAG definition. Run the DAG starting January 1st, 2019 on a daily schedule.
dag = DAG(
dag_id="astro_orders",
start_date=datetime(2019, 1, 1),
schedule="@daily",
catchup=False,
)
with dag:
# Load a file with a header from S3 into a temporary Table, referenced by the
# variable `orders_data`. This simulated the `extract` step of the ETL pipeline.
orders_data = aql.load_file(
# Data file needs to have a header row. The input and output table can be replaced with any
# valid file and connection ID.
input_file=File(
path=S3_FILE_PATH + "/orders_data_header.csv", conn_id=S3_CONN_ID
),
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
# apply constraints to the columns of the temporary output table,
# which is a requirement for running the '.merge' function later in the DAG.
columns=[
sqlalchemy.Column("order_id", sqlalchemy.String(60), primary_key=True),
sqlalchemy.Column(
"customer_id",
sqlalchemy.String(60),
nullable=False,
key="customer_id",
),
sqlalchemy.Column(
"purchase_date",
sqlalchemy.String(60),
nullable=False,
key="purchase_date",
),
sqlalchemy.Column(
"amount", sqlalchemy.Integer, nullable=False, key="amount"
),
],
),
)
# Create a Table object for customer data in the Snowflake database
customers_table = Table(
name=SNOWFLAKE_CUSTOMERS,
conn_id=SNOWFLAKE_CONN_ID,
)
# Filter the orders data and then join with the customer table,
# saving the output into a temporary table referenced by the Table instance `joined_data`
joined_data = join_orders_customers(filter_orders(orders_data), customers_table)
# Merge the joined data into the reporting table based on the order_id.
# If there's a conflict in the customer_id or customer_name, then use the ones from
# the joined data
reporting_table = aql.merge(
target_table=Table(
name=SNOWFLAKE_REPORTING,
conn_id=SNOWFLAKE_CONN_ID,
),
source_table=joined_data,
target_conflict_columns=["order_id"],
columns=["customer_id", "customer_name"],
if_conflicts="update",
)
# Transform the reporting table into a dataframe
purchase_dates = transform_dataframe(reporting_table)
# Delete temporary and unnamed tables created by `load_file` and `transform`, in this example
# both `orders_data` and `joined_data`
aql.cleanup()
This DAG extracts the data from S3, loads it into a Snowflake table and runs a few simple SQL statements to clean the data, load it into a reporting table, and transform it into a dataframe so that you can print various table details to Airflow logs using Python.
Much of this DAG's functionality comes from the Python functions that use task decorators from the Python SDK. See How it works for more information about these decorators and the benefits of using the SDK for this implementation.
Step 6: Run the code
-
In the Airflow UI, you should see a DAG called
astro_orders
. Make it active by clicking the slider next to its name: -
Click the play button next to the DAG's name to run the DAG:
-
Click the DAG's name to see how it ran in the Grid view:
How it works
The example DAG uses the TaskFlow API and decorators to define dependencies between tasks. If you're used to defining dependencies with bitshift operators, this might not look familiar. Essentially, the TaskFlow API abstracts dependencies, XComs, and other boilerplate DAG code so that you can define task dependencies with function invocations.
The Astro SDK takes this abstraction a step further while providing more flexibility to your code. The most important details are:
-
Using
aql
decorators, you can run both SQL and Python within a Pythonic context. This example DAG uses decorators to run both SQL queries and Python code -
The Astro SDK includes a
Table
object which contains all of the metadata that's necessary for handling SQL table creation between Airflow tasks. When aTable
is passed into a function, the Astro SDK automatically passes all connection, XCom, and metadata configuration to the task.The example DAG demonstrates one of the key powers of the
Table
object. When the DAG ranjoin_orders_customers
, it joined two tables that had different connections and schema. The Astro SDK automatically creates a temporary table and handles joining the tables. This also means that you can replace the S3 and Snowflake configurations with any valid configurations for other supported data stores and the code will still work. The Astro SDK handles all of the translation between services and database types in the background. -
The Astro SDK can automatically convert to SQL tables to pandas DataFrames using the
aql.dataframe
, meaning you can run complex ML models and SQL queries on the same data in the same DAG without any additional configuration.
Now that you understand the core qualities of the Astro SDK, let's look at it in the context of the example DAG by walking through each step in your ETL pipeline.
Load
To load data from S3 into a SQL Table, you only need to specify the location of the data on S3 and an Airflow connection for the destination SQL table in Snowflake.
Because the content of orders_data
isn't needed after the DAG is completed, it's specified without a name. When you define a Table
object without a preexisting name, that table is considered a temporary table. The Astro SDK deletes all temporary tables after you run aql.cleanup
in your DAG.
In this example, a schema is used to define the temporary table because a temporary table can only be merged in Snowflake when it includes constraints. Not all databases require constraints for temporary tables. For a list of databases that require constraints for temporary tables, see the Astro Python SDK documentation.
# Load a file with a header from S3 into a temporary Table, referenced by the
# variable `orders_data`
orders_data = aql.load_file(
# data file needs to have a header row
input_file=File(path=S3_FILE_PATH + "/orders_data_header.csv", conn_id=S3_CONN_ID),
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
columns=[
sqlalchemy.Column("order_id", sqlalchemy.String(60), primary_key=True),
sqlalchemy.Column(
"customer_id",
sqlalchemy.String(60),
nullable=False,
key="customer_id",
),
sqlalchemy.Column(
"purchase_date",
sqlalchemy.String(60),
nullable=False,
key="purchase_date",
),
sqlalchemy.Column(
"amount", sqlalchemy.Integer, nullable=False, key="amount"
),
],
),
)
Transform
You can filter your loaded table from S3 and join it to a Snowflake table with single line of code. The result of this function is a temporary table called joined_data
:
@aql.transform
def filter_orders(input_table: Table):
return "SELECT * FROM {{input_table}} WHERE amount > 150"
@aql.transform
def join_orders_customers(filtered_orders_table: Table, customers_table: Table):
return """SELECT c.customer_id, customer_name, order_id, purchase_date, amount, type
FROM {{filtered_orders_table}} f JOIN {{customers_table}} c
ON f.customer_id = c.customer_id"""
# Create a Table object for customer data in our Snowflake database
customers_table = Table(
name=SNOWFLAKE_CUSTOMERS,
conn_id=SNOWFLAKE_CONN_ID,
)
# Filter the orders data and then join with the customer table,
# saving the output into a temporary table referenced by the Table instance `joined_data`
joined_data = join_orders_customers(filter_orders(orders_data), customers_table)
Because you defined customers_table
, it is not considered temporary and will not be deleted after running aql.cleanup
.
Merge
To merge your processed data into a reporting table, you can call the aql.merge
function:
# Merge the joined data into our reporting table, based on the order_id .
# If there's a conflict in the customer_id or customer_name then use the ones from
# the joined data
reporting_table = aql.merge(
target_table=Table(
name=SNOWFLAKE_REPORTING,
conn_id=SNOWFLAKE_CONN_ID,
),
source_table=joined_data,
target_conflict_columns=["order_id"],
columns=["customer_id", "customer_name"],
if_conflicts="update",
)
aql.merge
is database agnostic. It automatically handles all background XComs and configuration that are required when working with tables from separate sources.
Dataframe transformation
To illustrate the power of the @aql.dataframe
decorator, the DAG simply converts your reporting table to a simple dataframe operation:
@aql.dataframe
def transform_dataframe(df: DataFrame):
purchase_dates = df.loc[:, "purchase_date"]
print("purchase dates:", purchase_dates)
return purchase_dates
You can find the output of this function in the logs of the final task:
Clean up temporary tables
Temporary tables can be created by setting table.temp=True
when defining a Table object, or by simply not defining a Table
outside of a function. The example DAG creates two temporary tables: joined_data
and orders_data
.
To remove these tables from your database once the DAG completes, you can delete them using the aql.cleanup()
function:
# Delete all temporary tables
aql.cleanup()
Conclusion
You now know how to use the Astro SDK to write a common ETL pipeline! More specifically, you can now:
- Pass connection and database information to downstream Airflow tasks.
- Merge tables from different schemas and platforms without using XComs.
- Run SQL in an entirely Pythonic context.
As a next step, read the Astro Python SDK documentation to learn more about its functionality and task decorators.