Skip to main content

Using Airflow to Execute SQL

Executing SQL queries is one of the most common use cases for data pipelines. Whether you're extracting and loading data, calling a stored procedure, or executing a complex query for a report, Airflow has you covered. Using Airflow, you can orchestrate all of your SQL tasks elegantly with just a few lines of boilerplate code.

In this guide you'll learn about the best practices for executing SQL from your DAG, review the most commonly used Airflow SQL-related operators, and then use sample code to implement a few common SQL use cases.

info

All code used in this guide is located in the Astronomer GitHub.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Best practices for executing SQL from your DAG

No matter what database or SQL version you're using, there are many ways to execute your queries using Airflow. Once you determine how to execute your queries, the following tips will help you keep your DAGs clean, readable, and efficient for execution.

Use hooks and operators

Using hooks and operators whenever possible makes your DAGs easier to read, easier to maintain, and improves performance. The SQL-related operators included with Airflow can significantly limit the code needed to execute your queries.

Keep lengthy SQL code out of your DAG

Astronomer recommends avoiding top-level code in your DAG file. If you have a SQL query, you should keep it in its own .sql file and imported into your DAG.

If you use the Astro CLI, you can store scripts like SQL queries in the include/ directory:

├─ dags/  
| └─ example-dag.py
├─ plugins/
├─ include/
| ├─ query1.sql
| └─ query2.sql
├─ Dockerfile
├─ packages.txt
└─ requirements.txt

An exception to this rule could be very short queries (such as SELECT * FROM table). Putting one-line queries like this directly in the DAG is fine if it makes your code more readable.

Keep transformations in SQL

Remember that Airflow is primarily an orchestrator, not a transformation framework. While you have the full power of Python in your DAG, Astronomer recommends offloading as much of your transformation logic as possible to third party transformation frameworks. With SQL, this means completing the transformations within your query whenever possible.

SQL operators

To make working with SQL easier, Airflow includes many built in operators. This guide discusses some of the most commonly used operators and shouldn't be considered a definitive resource. For more information about the available Airflow operators, see airflow.operators.

info

In Airflow 2+, provider packages are separate from the core of Airflow. If you're running Airflow 2+, you might need to install separate packages (such as apache-airflow-providers-snowflake) to use the hooks, operators, and connections described here. In an Astro project, you can do this by adding the package names to your requirements.txt file. To learn more, read Airflow Docs on Provider Packages.

Action operators

In Airflow, action operators execute a function. You can use action operators (or hooks if no operator is available) to execute a SQL query against a database. Commonly used SQL-related action operators include:

Transfer operators

Transfer operators move data from a source to a destination. For SQL-related tasks, they can often be used in the 'Extract-Load' portion of an ELT pipeline and can significantly reduce the amount of code you need to write. Some examples are:

Examples

Now that you've learned about the most commonly used Airflow SQL operators, you'll use the operators in some SQL use cases. For this guide you'll use Snowflake, but the concepts shown can be adapted for other databases. Some of the environment setup for each example makes use of the Astro CLI and Astro project structure, but you can also adapt this setup for use with Apache Airflow.

Example 1: Execute a query

In this first example, a DAG executes two simple interdependent queries using SQLExecuteQueryOperator.

First you need to define your DAG:

from airflow.decorators import dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta

default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}


@dag(
"call_snowflake_sprocs",
start_date=datetime(2020, 6, 1),
max_active_runs=3,
schedule="@daily",
default_args=default_args,
template_searchpath="/usr/local/airflow/include",
catchup=False,
)
def call_snowflake_sprocs():
opr_call_sproc1 = SQLExecuteQueryOperator(
task_id="call_sproc1", conn_id="snowflake", sql="call-sproc1.sql"
)

opr_call_sproc2 = SQLExecuteQueryOperator(
task_id="call_sproc2", conn_id="snowflake", sql="call-sproc2.sql"
)

opr_call_sproc1 >> opr_call_sproc2


call_snowflake_sprocs()

The template_searchpath argument in the DAG definition tells the DAG to look in the given folder for scripts, so you can now add two SQL scripts to your project. In this example, those scripts are call-sproc1.sql and call-sproc2.sql, which contain the following SQL code respectively:

-- call-sproc1
CALL sp_pi();
-- call-sproc2
CALL sp_pi_squared();

sp_pi() and sp_pi_squared() are two stored procedures that are defined in a Snowflake instance. Note that the SQL in these files could be any type of query you need to execute. Sprocs are used here only as an example.

Finally, you need to set up a connection to Snowflake. There are a few ways to manage connections using Astronomer, including IAM roles, secrets managers, and the Airflow API. For this example, set up a connection using the Airflow UI. Because the connection in the DAG is called snowflake, your configured connection should look something like this:

Configured Snowflake connection in the Airflow UI

With the connection established, you can now run the DAG to execute the SQL queries.

Example 2: Execute a query with parameters

Using Airflow, you can also parameterize your SQL queries to make them more dynamic. Consider when you have a query that selects data from a table for a date that you want to dynamically update. You can execute the query using the same setup as in Example 1, but with a few adjustments.

Your DAG will look like the following:

from airflow.decorators import dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from pendulum import datetime, duration

default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=1),
}


@dag(
start_date=datetime(2020, 6, 1),
max_active_runs=3,
schedule="@daily",
default_args=default_args,
template_searchpath="/usr/local/airflow/include",
catchup=False,
)
def parameterized_query():
opr_param_query = SQLExecuteQueryOperator(
task_id="param_query", conn_id="snowflake", sql="param-query.sql"
)

opr_param_query


parameterized_query()

The DAG is essentially the same that you used in Example 1. The difference is in the query itself:

SELECT *
FROM STATE_DATA
WHERE date = '{{ yesterday_ds_nodash }}'

In this example, the query has been parameterized to dynamically select data for yesterday's date using a built-in Airflow variable with double curly brackets. The rendered template in the Airflow UI looks like this:

Rendered Template

Astronomer recommends using Airflow variables or macros whenever possible to increase flexibility and make your workflows idempotent. The above example will work with any Airflow variables. For example, you could access a variable from your Airflow config:

SELECT *
FROM STATE_DATA
WHERE state = {{ conf['state_variable'] }}

If you need a parameter that is not available as a built-in variable or a macro, such as a value from another task in your DAG, you can also pass that parameter into your query using the operator:

opr_param_query = SQLExecuteQueryOperator(
task_id="param_query",
conn_id="snowflake",
sql="param-query.sql",
parameters={"my_date": mydatevariable}
)

And then reference that param in your SQL file:

SELECT *
FROM STATE_DATA
WHERE date = %(my_date)s

An alternative option is to use Jinja templates in your SQL statements and pass the relevant values using the params parameter.

    opr_param_query = SQLExecuteQueryOperator(
task_id="param_query",
conn_id="snowflake_default",
sql="param-query.sql",
params={"my_date": "2022-09-01"},
)

The value can be referenced using Jinja syntax in the SQL statement:

SELECT *
FROM STATE_DATA
WHERE date = '{{ params.date }}'
tip

Airflow params can also be defined at the DAG-level and passed to a DAG at runtime. Params passed at runtime will override param defaults defined at the DAG or the task level. For more information on params see the Create and use params in Airflow guide.

Example 3: Load data

The next example loads data from an external source into a database table. You'll pull data from an API and save it to a flat file on Amazon S3, which you can then load into Snowflake.

This example uses the S3toSnowflakeOperator to limit the code that you have to write.

First, create a DAG that pulls cat facts from an API endpoint, saves the data to comma-separated values (CSVs) on S3, and loads each of those CSVs to Snowflake using the transfer operator. Here's the DAG code:

from pendulum import datetime, duration

import requests
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator

S3_CONN_ID = "astro-s3-workshop"
BUCKET = "astro-workshop-bucket"
name = "cat_data" # swap your name here


@task
def upload_to_s3(cat_fact_number):
# Instantiate
s3_hook = S3Hook(aws_conn_id=S3_CONN_ID)

# Base URL
url = "http://catfact.ninja/fact"

# Grab data
res = requests.get(url)

# Take string, upload to S3 using predefined method
s3_hook.load_string(
res.text,
"cat_fact_{0}.csv".format(cat_fact_number),
bucket_name=BUCKET,
replace=True,
)


number_of_cat_facts = 3


@dag(
start_date=datetime(2020, 6, 1),
max_active_runs=3,
schedule="@daily",
default_args={"retries": 1, "retry_delay": duration(minutes=5)},
catchup=False,
)
def cat_data_s3_to_snowflake():
t0 = EmptyOperator(task_id="start")

for i in range(number_of_cat_facts):
snowflake = S3ToSnowflakeOperator(
task_id="upload_{0}_snowflake".format(i),
s3_keys=["cat_fact_{0}.csv".format(i)],
stage="cat_stage",
table="CAT_DATA",
schema="SANDBOX_KENTEND",
file_format="cat_csv",
snowflake_conn_id="snowflake",
)

t0 >> upload_to_s3(i) >> snowflake


cat_data_s3_to_snowflake()

This image shows a graph view of the DAG:

Cat-to-Snowflake Graph

There are a few things you need to configure in Snowflake to make this DAG work:

  • A table that will receive the data (CAT_DATA in this example).
  • A defined Snowflake stage (cat_stage) and file format (cat_csv). See the Snowflake documentation.

Next, set up your Airflow connections. This example requires two connections:

  • A connection to S3 (established using astro-s3-workshop in the DAG above).
  • A connection to Snowflake (established using snowflake. See Example 1 for a screenshot of what the connection should look like).

After this setup, you're ready to run the DAG!

Note that while this example is specific to Snowflake, the concepts apply to any database you might be using. If a transfer operator doesn't exist for your specific source and destination tools, you can always write your own (and maybe contribute it back to the Airflow project)!

Example 4: Use gusty

If you're unfamiliar with Airflow or Python, you can use gusty to generate DAGs directly from your SQL code.

You can install gusty in your Airflow environment by running pip install gusty from your command line. If you use the Astro CLI, you can alternatively add gusty to your Astro project requirements.txt file.

Once you've installed gusty in your Airflow environment, you can turn your SQL files into Airflow tasks by adding YAML instructions to the front matter your SQL file. The front matter is the section at the top of the SQL file enclosed in ---.

The example below shows how to turn a simple SQL statement into an Airflow task using the PostgresOperator with the postgres_default connection ID. This SQL file task_1.sql defines a task that will only run once task_0 of the same DAG has completed successfully.

---
operator: airflow.providers.postgres.operators.postgres.PostgresOperator
conn_id: postgres_default
dependencies:
- task_0
---

SELECT column_1
FROM table_2;

Add your modified SQL files within a gusty directory structure:

.
└── dags
├── my_gusty_dags
│ ├── my_dag_1
│ │ ├── METADATA.yml
│ │ ├── task_0.sql
│ │ └── task_1.sql
└── creating_gusty_dags.py

The rendered my_dag_1 DAG will contain two tasks defined as SQL files:

gusty graph

See METADATA.yml and creating_gusty_dags.py in the gusty documentation for example configurations.

Note that by default, gusty will add a LatestOnlyOperator to the root of your DAG. You can disable this behavior by passing latest_only=False to the create_dags function, or setting latest_only: False in the METADATA.yml.

Next steps

You've learned how to interact with your SQL database from Airflow. There are some topics you didn't cover, including:

  • How does it work behind the scenes?
  • What if you want to retrieve data with the PostgresOperator?
  • Is it scalable?

Find out more on Astronomer's Airflow: Branching course for free today.

Was this page helpful?