Use a listener to send a Slack notification when a dataset is updated
Airflow listeners allow you to execute custom code when certain events occur anywhere in your Airflow instance, for example when any DAG run fails or any dataset is updated.
Listeners are implemented as an Airflow plugin and can contain any code. In this tutorial, you'll use a listener to send a Slack notification whenever any dataset is updated.
If you only need to implement notifications for specific DAGs and tasks, consider using Airflow callbacks instead.
The on_dataset_created
and on_dataset_changed
listeners are currently considered experimental and might be subject to breaking changes in future releases.
Time to complete
This tutorial takes approximately 15 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow plugins. See Airflow plugins.
- Airflow datasets. See Datasets and data-aware scheduling in Airflow.
Prerequisites
- The Astro CLI using Astro Runtime 10+ (Airflow 2.8+).
- A Slack workspace with an Incoming Webhook configured.
Step 1: Configure your Astro project
-
Create a new Astro project:
$ mkdir astro-listener-tutorial && cd astro-listener-tutorial
$ astro dev init -
Add the following line to your Astro project
requirements.txt
file to install the Slack Airflow provider.apache-airflow-providers-slack==8.4.0
-
Add the following environment variable to your Astro project
.env
file to create an Airflow connection to Slack. Make sure to replace<your-slack-webhook-token>
with your own Slack webhook token in the format ofT00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
.AIRFLOW_CONN_SLACK_WEBHOOK_CONN='{
"conn_type": "slackwebhook",
"host": "https://hooks.slack.com/services/",
"password": "<your-slack-webhook-token>"
}'
Step 2: Create your listener
To define an Airflow listener, you add the code you want to execute to a relevant @hookimpl
-decorated listener function. In this example, you define your code in the on_dataset_changed
function to run whenever any dataset is updated.
- Create a new file called
listeners_code.py
in yourplugins
folder. - Copy the following code into the file:
from airflow.datasets import Dataset
from airflow.listeners import hookimpl
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import TaskInstanceState
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from sqlalchemy.orm.session import Session
from datetime import datetime
SLACK_CONN_ID = "slack_webhook_conn"
@hookimpl
def on_dataset_changed(dataset: Dataset):
"""Execute if a dataset is updated."""
print("I am always listening for any Dataset changes and I heard that!")
print("Posting to Slack...")
hook = SlackWebhookHook(slack_webhook_conn_id=SLACK_CONN_ID)
hook.send(text=f"A dataset was changed!")
print("Done!")
if dataset.uri == "file://include/bears":
print("Oh! This is the bears dataset!")
print("Bears are great :)")
start_date = datetime.now().date()
end_date = datetime(2024, 10, 4).date()
days_until = (end_date - start_date).days
print(f"Only approximately {days_until} days until fat bear week!")
This listener is defined using the on_dataset_changed
hookspec. It posts a message to Slack whenever any dataset is updated and executes an additional print statement if the dataset that is being updated has the URI file://include/bears
.
Step 3: Create the listener plugin
For Airflow to recognize your listener, you need to create a plugin that registers it.
-
Create a new file called
listener_plugin.py
in yourplugins
folder. -
Copy the following code into the file:
from airflow.plugins_manager import AirflowPlugin
from plugins import listener_code
class MyListenerPlugin(AirflowPlugin):
name = "my_listener_plugin"
listeners = [listener_code] -
If your local Airflow environment is already running, restart it to apply the changes to your plugins.
Step 4: Create your DAG
-
In your
dags
folder, create a file calledproducer_dag.py
. -
Copy the following code into the file.
"""
## DAG to produce to a Dataset showcasing the on_dataset_changed listener
This DAG will produce to a Dataset, updating it which triggers the
on_dataset_changed listener define as an Airflow Plugin.
The DAG also shows the difference between a Dataset and ObjectStoragePath.
"""
from airflow.datasets import Dataset
from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath
from pendulum import datetime
import requests
URI = "file://include/bears"
MY_DATASET = Dataset(URI)
base_local = ObjectStoragePath(URI)
@dag(
start_date=datetime(2023, 12, 1),
schedule="0 0 * * 0",
catchup=False,
doc_md=__doc__,
tags=["on_dataset_changed listener", "2-8"],
)
def producer_dag():
@task(
outlets=[MY_DATASET],
)
def get_bear(base):
r = requests.get("https://placebear.com/200/300")
file_path = base / "bear.jpg"
if r.status_code == 200:
base.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(r.content)
file_path.replace("bear.jpg")
else:
print(f"Failed to retrieve image. Status code: {r.status_code}")
get_bear(base=base_local)
producer_dag()This simple DAG contains one task that queries the placebear API and writes the image retrieved to a local
.png
file in theinclude
folder using the Airflow object storage feature. The task produces an update to thefile://include/bears
dataset, which triggers the listener you created in Step 2.
Step 5: Run your DAG
-
Run
astro dev start
in your Astro project to start Airflow, then open the Airflow UI atlocalhost:8080
. -
In the Airflow UI, run the
producer_dag
DAG by clicking the play button. -
After the DAG run completed, go to the task logs of the
get_bear
task to see print statements from your listener plugin.[2023-12-17, 14:46:51 UTC] {logging_mixin.py:188} INFO - I am always listening for any dataset changes and I heard that!
[2023-12-17, 14:46:51 UTC] {logging_mixin.py:188} INFO - Posting to Slack...
[2023-12-17, 14:46:51 UTC] {base.py:83} INFO - Using connection ID 'slack_webhook_conn' for task execution.
[2023-12-17, 14:46:51 UTC] {logging_mixin.py:188} INFO - Done!
[2023-12-17, 14:46:51 UTC] {logging_mixin.py:188} INFO - Oh! This is the bears dataset!
[2023-12-17, 14:46:51 UTC] {logging_mixin.py:188} INFO - Bears are great :)
[2023-12-17, 14:46:51 UTC] {logging_mixin.py:188} INFO - Only approximately 292 days until fat bear week -
Open your Slack workspace to see a new message from your webhook.
-
(Optional) View your complimentary bear picture at
include/bears/bear.png
.
Conclusion
Congratulations! You now know how to create an Airflow listener to run custom code whenever any dataset is updated in your whole Airflow environment. Following the same pattern you can implement listeners for other events, such as when any task has failed, any DAG starts running or a lifecycle event occurs.