Airflow sensors
Apache Airflow sensors are a special kind of operator that are designed to wait for something to happen. When sensors run, they check to see if a certain condition is met before they are marked successful and let their downstream tasks execute. When used properly, they can be a great tool for making your DAGs more event driven.
In this guide, you'll learn how sensors are used in Airflow, best practices for implementing sensors in production, and how to use deferrable versions of sensors.
There are multiple resources for learning about this topic. See also:
- Astronomer Academy: Airflow: Sensors module.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Basic Python. See the Python Documentation.
Sensor basics
Sensors are a type of operator that checks if a condition is met at a specific interval. If the condition is met, the task is marked successful and the DAG can move to downstream tasks. If the condition isn't met, the sensor waits for another interval before checking again.
All sensors inherit from the BaseSensorOperator
and have the following parameters:
mode
: How the sensor operates. There are two types of modes:poke
: This is the default mode. When usingpoke
, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor.reschedule
: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.
poke_interval
: When usingpoke
mode, this is the time in seconds that the sensor waits before checking the condition again. The default is 60 seconds.exponential_backoff
: When set toTrue
, this setting creates exponentially longer wait times between pokes inpoke
mode.timeout
: The maximum amount of time in seconds that the sensor checks the condition. If the condition is not met within the specified period, the task fails.soft_fail
: If set toTrue
, the task is marked as skipped if the condition is not met by thetimeout
.
Different types of sensors have different implementation details.
Commonly used sensors
Many Airflow provider packages contain sensors that wait for various criteria in different source systems. The following are some of the most commonly used sensors:
@task.sensor
decorator: Allows you to turn any Python function that returns a PokeReturnValue into an instance of the BaseSensorOperator class. This way of creating a sensor is useful when checking for complex logic or if you are connecting to a tool via an API that has no specific sensor available.S3KeySensor
: Waits for a key (file) to appear in an Amazon S3 bucket. This sensor is useful if you want your DAG to process files from Amazon S3 as they arrive.DateTimeSensor
: Waits for a specified date and time. This sensor is useful if you want different tasks within the same DAG to run at different times.ExternalTaskSensor
: Waits for an Airflow task to be completed. This sensor is useful if you want to implement cross-DAG dependencies in the same Airflow environment.HttpSensor
: Waits for an API to be available. This sensor is useful if you want to ensure your API requests are successful.SqlSensor
: Waits for data to be present in a SQL table. This sensor is useful if you want your DAG to process data as it arrives in your database.
To review the available Airflow sensors, go to the Astronomer Registry.
Example implementation
The following example DAG shows how you might use the SqlSensor
sensor:
- TaskFlow API
- Traditional syntax
from airflow.decorators import task, dag
from airflow.providers.common.sql.sensors.sql import SqlSensor
from typing import Dict
from pendulum import datetime
def _success_criteria(record):
return record
def _failure_criteria(record):
return True if not record else False
@dag(
description="DAG in charge of processing partner data",
start_date=datetime(2021, 1, 1),
schedule="@daily",
catchup=False,
)
def partner():
waiting_for_partner = SqlSensor(
task_id="waiting_for_partner",
conn_id="postgres",
sql="sql/CHECK_PARTNER.sql",
parameters={"name": "partner_a"},
success=_success_criteria,
failure=_failure_criteria,
fail_on_empty=False,
poke_interval=20,
mode="reschedule",
timeout=60 * 5,
)
@task
def validation() -> Dict[str, str]:
return {"partner_name": "partner_a", "partner_validation": True}
@task
def storing():
print("storing")
waiting_for_partner >> validation() >> storing()
partner()
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.sensors.sql import SqlSensor
from typing import Dict
from pendulum import datetime
def _success_criteria(record):
return record
def _failure_criteria(record):
return True if not record else False
with DAG(
dag_id="partner",
description="DAG in charge of processing partner data",
start_date=datetime(2021, 1, 1),
schedule="@daily",
catchup=False,
):
waiting_for_partner = SqlSensor(
task_id="waiting_for_partner",
conn_id="postgres",
sql="sql/CHECK_PARTNER.sql",
parameters={"name": "partner_a"},
success=_success_criteria,
failure=_failure_criteria,
fail_on_empty=False,
poke_interval=20,
mode="reschedule",
timeout=60 * 5,
)
def validation_function() -> Dict[str, str]:
return {"partner_name": "partner_a", "partner_validation": True}
validation = PythonOperator(
task_id="validation", python_callable=validation_function
)
def storing_function():
print("storing")
storing = PythonOperator(task_id="storing", python_callable=storing_function)
waiting_for_partner >> validation >> storing
This DAG waits for data to be available in a Postgres database before running validation and storing tasks. The SqlSensor
runs a SQL query and is marked successful when that query returns data. Specifically, when the result is not in the set (0, '0', '', None). The SqlSensor
task in the example DAG (waiting_for_partner
) runs the CHECK_PARTNER.sql
script every 20 seconds (the poke_interval
) until the data is returned. The mode
is set to reschedule
, meaning between each 20 second interval the task will not take a worker slot. The timeout
is set to 5 minutes, and the task fails if the data doesn't arrive within that time. When the SqlSensor
criteria is met, the DAG moves to the downstream tasks. You can find the full code for this example in the webinar-sensors repo.
Sensor decorator / PythonSensor
If no sensor exists for your use case, you can create your own using either the @task.sensor
decorator or the PythonSensor. The @task.sensor
decorator returns a PokeReturnValue
as an instance of the BaseSensorOperator. The PythonSensor takes a python_callable
that returns True
or False
.
The following DAG shows how to use either the sensor decorator or the PythonSensor to create the same custom sensor:
- @task.sensor
- PythonSensor
"""
### Create a custom sensor using the @task.sensor decorator
This DAG showcases how to create a custom sensor using the @task.sensor decorator
to check the availability of an API.
"""
from airflow.decorators import dag, task
from pendulum import datetime
import requests
# importing the PokeReturnValue
from airflow.sensors.base import PokeReturnValue
@dag(start_date=datetime(2022, 12, 1), schedule="@daily", catchup=False)
def sensor_decorator():
# supply inputs to the BaseSensorOperator parameters in the decorator
@task.sensor(poke_interval=30, timeout=3600, mode="poke")
def check_dog_availability() -> PokeReturnValue:
r = requests.get("https://random.dog/woof.json")
print(r.status_code)
# set the condition to True if the API response was 200
if r.status_code == 200:
condition_met = True
operator_return_value = r.json()
else:
condition_met = False
operator_return_value = None
print(f"Woof URL returned the status code {r.status_code}")
# the function has to return a PokeReturnValue
# if is_done = True the sensor will exit successfully, if
# is_done=False, the sensor will either poke or be rescheduled
return PokeReturnValue(is_done=condition_met, xcom_value=operator_return_value)
# print the URL to the picture
@task
def print_dog_picture_url(url):
print(url)
print_dog_picture_url(check_dog_availability())
sensor_decorator()
Here, the @task.sensor
decorates the check_dog_availability()
function, which checks if a given API returns a 200 status code. If the API returns a 200 status code, the sensor task is marked as successful. If any other status code is returned, the sensor pokes again after the poke_interval
has passed.
The optional xcom_value
parameter in PokeReturnValue
defines what data will be pushed to XCom once is_done=true
. You can use the data that was pushed to XCom in any downstream tasks.
"""
### Create a custom sensor using the PythonSensor
This DAG showcases how to create a custom sensor using the PythonSensor
to check the availability of an API.
"""
from airflow.decorators import dag, task
from pendulum import datetime
import requests
from airflow.sensors.python import PythonSensor
def check_dog_availability_func(**context):
r = requests.get("https://random.dog/woof.json")
print(r.status_code)
# set the condition to True if the API response was 200
if r.status_code == 200:
operator_return_value = r.json()
# pushing the link to the Dog picture to XCom
context["ti"].xcom_push(key="return_value", value=operator_return_value)
return True
else:
operator_return_value = None
print(f"Woof URL returned the status code {r.status_code}")
return False
@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
tags=["sensor"],
)
def pythonsensor_example():
# turn any Python function into a sensor
check_dog_availability = PythonSensor(
task_id="check_dog_availability",
poke_interval=10,
timeout=3600,
mode="reschedule",
python_callable=check_dog_availability_func,
)
# click the link in the logs for a cute picture :)
@task
def print_dog_picture_url(url):
print(url)
print_dog_picture_url(check_dog_availability.output)
pythonsensor_example()
Here, the PythonSensor uses the check_dog_availability_func
to check if a given API returns a 200 status code. If the API returns a 200 status code, the API response is pushed to XCom and the function returns True
, causing the sensor task to be marked as successful. If any other status code is returned the check_dog_availability_func
returns False
and the sensor pokes again after the poke_interval
has passed.
Sensor best practices
When using sensors, keep the following in mind to avoid potential performance issues:
- Always define a meaningful
timeout
parameter for your sensor. The default for this parameter is seven days, which is a long time for your sensor to be running. When you implement a sensor, consider your use case and how long you expect the sensor to wait and then define the sensor's timeout accurately. - Whenever possible and especially for long-running sensors, use the
reschedule
mode so your sensor is not constantly occupying a worker slot. This helps avoid deadlocks in Airflow where sensors take all of the available worker slots. - If your
poke_interval
is very short (less than about 5 minutes), use thepoke
mode. Usingreschedule
mode in this case can overload your scheduler. - Define a meaningful
poke_interval
based on your use case. There is no need for a task to check a condition every 60 seconds (the default) if you know the total amount of wait time will be 30 minutes.
Sensor failure modes
When using sensors, there are different options to define its behavior in case of an exception raised within the sensor.
soft_fail=True
: If an exception is raised within the task, it is marked as skipped, affecting downstream tasks according to their defined trigger rules.silent_fail=True
: If an exception is raised in the poke method that is not one of: AirflowSensorTimeout, AirflowTaskTimeout, AirflowSkipException or AirflowFailException, the sensor will log the error but continue its execution.never_fail=True
: (Airflow 2.10+) If the poke method raises any exception, the sensor task is skipped. This parameter is mutually exclusive withsoft_fail
.
Deferrable operators
Deferrable operators (sometimes referred to as asynchronous operators) eliminate the problem of having any operator or sensor using a full worker slot for the entire time they run. Many operators have a deferrable
parameter that can be set to True
to make the operator deferrable. For the sensors where this parameter is not available, deferrable versions exist in open source Airflow and in the Astronomer Providers package. Astronomer recommends using these in most cases to reduce resource costs.
For DAG authors, using deferrable sensors is no different from using regular sensors. All you need is to do is run a triggerer
process in Airflow and either:
- Set the Airflow config
operators.default_deferrable
toTrue
to set all sensors with adeferrable
parameter to be deferrable by default. - Set the
deferrable
parameter toTrue
on individual sensor instances you want to run in deferrable mode. - Replace the name of a sensor with its deferrable counterpart if no
deferrable
parameter is available.
For more details, see Deferrable operators.