Orchestrate Great Expectations with Airflow
Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. Astronomer, with help from Superconductive, maintains the Great Expectations Airflow Provider that gives users a convenient method for running validations directly from their DAGs.
This tutorial shows how to use the GreatExpectationsOperator
in an Airflow DAG, leveraging automatic creation of a default Checkpoint and connecting via an Airflow connection.
There are multiple resources for learning about this topic. See also:
Time to complete
This tutorial takes approximately 20 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of Great Expectations. See Try GX Core.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
Prerequisites
- The Astro CLI.
- Access to a SQL database. This tutorial uses a local Postgres instance.
- (Optional) Local installation of the
great_expectations
package.
Step 1: Configure your Astro project
To use GX with Airflow, install the Great Expectations Airflow Provider in your Astro project.
-
Create a new Astro project:
$ mkdir astro-gx-tutorial && cd astro-gx-tutorial
$ astro dev init -
Add the GX Airflow provider to your Astro project
requirements.txt
file.airflow-provider-great-expectations==0.2.7
-
Add the following GX dependency to your Astro project
packages.txt
file.libgeos-c1v5
Step 2: Configure a GX project
The Great Expectations Airflow Provider requires a GX project to be present in your Airflow environment. The easiest way to create a GX project is by using the great_expectations
package and following the steps below. If you cannot install the GX package locally, you can copy the great_expectations
folder from this GitHub repository into your Astro project include
folder instead and continue this tutorial at Step 3.
-
In your
include
folder, create a new file calledgx_init_script.py
. Then, copy and paste the following code into the file:import great_expectations as gx
context = gx.get_context()
context = context.convert_to_file_context() -
Go to your
include
folder and run thegx_init_script.py
file to instantiate a Data Context atinclude/great_expectations
.$ cd include
$ python gx_init_script.py -
Create a new file in your
include/great_expectations/expectations
folder calledstrawberry_suite.json
. Then, copy and paste the following code into the file:{
"data_asset_type": null,
"expectation_suite_name": "strawberry_suite",
"expectations": [
{
"expectation_context": {
"description": null
},
"expectation_type": "expect_table_row_count_to_be_between",
"ge_cloud_id": null,
"kwargs": {
"min_value": 2,
"max_value": 1000
},
"meta": {}
}
],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.15.34"
}
}This JSON file defines an Expectation Suite containing one expectation of the type
expect_table_row_count_to_be_between
. This expectation will check that the number of rows in a table is between 2 and 1000.You should now have the following file structure in your
include
folder:└── include
└── great_expectations
│ ├── checkpoints
│ ├── expectations
│ │ └── strawberry_suite.json
│ ├── plugins
│ ├── profilers
│ ├── uncommitted
│ ├── .gitignore
│ └── great_expectations.yml
└── gx_init_script.py -
Run
astro dev start
to start your Astro project.
Step 3: Create a database connection
The easiest way to use GX with Airflow is to let the GreatExpectationsOperator create a default Checkpoint and Datasource based on an Airflow connection. To set up a connection to a Postgres database complete the following steps:
-
In the Airflow UI at
localhost:8080
, go to Admin -> Connections and click +. -
Create a new connection named
postgres_default
using the following information:- Connection Id:
postgres_default
. - Connection Type:
Postgres
. - Host:
<your postgres host>
. - Login:
<your postgres username>
. - Password:
<your postgres password>
. - Schema:
postgres
. Note that this is the name of the PostgreSQL database, not of the schema in the database. See the PostgreSQL provider documentation. - Port:
<your postgres port>
.
- Connection Id:
-
Click Save.
Step 4: Create a DAG
-
Create a new file in your
dags
folder calledgx_tutorial.py
. -
Copy and paste the following DAG code into the file:
from pendulum import datetime
from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator,
)
POSTGRES_CONN_ID = "postgres_default"
MY_POSTGRES_SCHEMA = "sweet_treats"
MY_GX_DATA_CONTEXT = "include/great_expectations"
@dag(
start_date=datetime(2023, 7, 1),
schedule=None,
catchup=False,
)
def gx_tutorial():
create_table_pg = PostgresOperator(
task_id="create_table_pg",
postgres_conn_id=POSTGRES_CONN_ID,
sql=f"""
CREATE SCHEMA IF NOT EXISTS {MY_POSTGRES_SCHEMA};
CREATE TABLE {MY_POSTGRES_SCHEMA}.strawberries (
id VARCHAR(10) PRIMARY KEY,
name VARCHAR(100),
amount INT
);
INSERT INTO {MY_POSTGRES_SCHEMA}.strawberries (id, name, amount)
VALUES ('001', 'Strawberry Order 1', 10),
('002', 'Strawberry Order 2', 5),
('003', 'Strawberry Order 3', 8),
('004', 'Strawberry Order 4', 3),
('005', 'Strawberry Order 5', 12);
""",
)
gx_validate_pg = GreatExpectationsOperator(
task_id="gx_validate_pg",
conn_id=POSTGRES_CONN_ID,
data_context_root_dir=MY_GX_DATA_CONTEXT,
schema=MY_POSTGRES_SCHEMA,
data_asset_name="strawberries",
expectation_suite_name="strawberry_suite",
return_json_dict=True,
)
drop_table_pg = PostgresOperator(
task_id="drop_table_pg",
postgres_conn_id=POSTGRES_CONN_ID,
sql=f"""
DROP TABLE {MY_POSTGRES_SCHEMA}.strawberries;
""",
)
create_table_pg >> gx_validate_pg >> drop_table_pg
gx_tutorial()This DAG will create a table in your Postgres database, run a GX validation on the table, and then drop the table.
The data in the table is validated using the GreatExpectationsOperator (GXO). The operator automatically creates a default Checkpoint and Datasource based on the
postgres_default
connection and runs the Expectations defined in thestrawberry_suite.json
file on thestrawberries
table. Note that instead of using theschema
parameter of the GXO, you can also provide the schema name to thedata_asset_name
parameter in the form ofmy_schema_name.my_table_name
. -
Open Airflow at
http://localhost:8080/
. Run the DAG manually by clicking the play button.
By default, the GreatExpectationsOperator
pushes a CheckpointResult
object to XCom. You can instead return a json-serializable dictionary by setting the return_json_dict
parameter to True
.
If you do not want to use this built-in serialization, you can either enable XCom pickling by setting the environment variable AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
, or use a custom serialization method in a custom XCom backend.
How it works
The GreatExpectationsOperator is a versatile operator allowing you to integrate GX into your Airflow environment. This tutorial shows the simplest way of using the operator by letting it create a default Checkpoint and Datasource based on an Airflow connection.
The GreatExpectationsOperator also allows you to pass in a CheckpointConfig object using the checkpoint_config
parameter or a checkpoint_kwargs
dictionary. You can also customize the Execution Engines and pass DataContextConfig objects by configuring Operator parameters.
For more examples, check out the Get Improved Data Quality Checks in Airflow with the Updated Great Expectations Operator blog post and the Airflow data quality demo repository.
Operator parameters
The GreatExpectationsOperator is highly customizable to allow expert GX users to use their custom objects. This section explains some of the most commonly used parameters. Please refer to the GX documentation for in depth explanations of GX concepts.
When using the GreatExpectationsOperator, you must pass in either one of the following parameters:
data_context_root_dir
(str): The path to your GX project directory. This is the directory containing yourgreat_expectations.yml
file.data_context_config
(DataContextConfig): To use an in-memory Data Context, aDataContextConfig
must be defined and passed to the operator, see also this example DataContextConfig.
Next, the operator will determine the Checkpoint and Datasource used to run your GX validations:
- If your Data Context does not specify a Datasource, and you do not pass in a Checkpoint, then the operator will build a Datasource and Checkpoint for you based on the
conn_id
you pass in and run the given Expectation Suite. This is the simplest way to use the operator and what is shown in this tutorial. - If your project's Data Context specifies Datasources already, all you need to pass is the Data Context and the Expectation Suite. The operator will use the Datasources in the Data Context to create a default Checkpoint.
- If your project's Store already contains a Checkpoint, you can specify its use by passing the name to the
checkpoint_name
parameter. The CheckpointStore is often in thegreat_expectations/checkpoints/
path, so thatcheckpoint_name = "strawberries.pass.chk"
would reference the filegreat_expectations/checkpoints/strawberries/pass/chk.yml
. - If you want to run a custom Checkpoint, you can either pass a CheckpointConfig object to
checkpoint_config
or a dictionary of your checkpoint config tocheckpoint_kwargs
.checkpoint_kwargs
can also be used to specify additional overwriting configurations. See the example CheckpointConfig.
The Datasource can also be a pandas DataFrame, as shown in Running GX validations on pandas DataFrames. Depending on how you define your Datasource the data_asset_name
parameter has to be adjusted:
- If a pandas DataFrame is passed, the
data_asset_name
parameter can be any name that will help you identify the DataFrame. - If a
conn_id
is supplied, thedata_asset_name
must be the name of the table the Expectations Suite runs on.
By default, a Great Expectations task runs validations and raises an AirflowException
if any of the tests fail. To override this behavior and continue running the pipeline even if tests fail, set the fail_task_on_validation_failure
flag to False
.
In Astronomer or any environment with OpenLineage configured, the GreatExpectationsOperator
will automatically add the OpenLineage action to its default action list when a Checkpoint is not specified to the operator. To turn off this feature, set the use_open_lineage
parameter to False
.
For a full list of parameters, see the the Astronomer registry. For more information about the parameters and examples, see the README in the provider repository.
Running GX validations on pandas DataFrames
The GreatExpectationsOperator can also be used to run validations on CSV files by passing them in as a pandas DataFrame. This pattern is useful to test pipelines locally with small amounts of data. Note that the execution_engine
parameter needs to be adjusted.
gx_validate_pg = GreatExpectationsOperator(
task_id="gx_validate_pg",
data_context_root_dir="include/great_expectations",
dataframe_to_validate=pd.read_csv("include/strawberries.csv"),
execution_engine="PandasExecutionEngine",
expectation_suite_name="strawberry_suite",
return_json_dict=True,
)
Connections and backends
The GreatExpectationsOperator can run a Checkpoint on a dataset stored in any backend compatible with GX. This includes BigQuery, MSSQL, MySQL, PostgreSQL, Redshift, Snowflake, SQLite, and Athena, among others. All that’s needed to get the GreatExpectationsOperator to point at an external dataset is to set up an Airflow Connection to the Datasource and setting the conn_id
parameter. Connections will still work if you have your connection configured in both Airflow and Great Expectations, as long as the correct Datasource is specified in the Checkpoint passed to the operator.