Skip to main content

Airflow task groups

Airflow task groups are a tool to organize tasks into groups within your DAGs. Using task groups allows you to:

  • Organize complicated DAGs, visually grouping tasks that belong together in the Airflow UI Grid View.
  • Apply default_args to sets of tasks, instead of at the DAG level using DAG parameters.
  • Dynamically map over groups of tasks, enabling complex dynamic patterns.
  • Turn task patterns into modules that can be reused across DAGs or Airflow instances.

In this guide, you'll learn how to create and use task groups in your DAGs. You can find many example DAGs using task groups on the Astronomer GitHub.

Task group intro gif

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

When to use task groups

Task groups are most often used to visually organize complicated DAGs. For example, you might use task groups:

  • In big ELT/ETL DAGs, where you have a task group per table or schema.
  • In MLOps DAGs, where you have a task group per model being trained.
  • In DAGs owned by several teams, where you have task groups to visually separate the tasks that belong to each team. Although in this case, it might be better to separate the DAG into multiple DAGs and use Datasets to connect them.
  • When you are using the same patterns of tasks in multiple DAGs and want to create a reusable module.
  • When you have an input of unknown length, for example an unknown number of files in a directory. You can use task groups to dynamically map over the input and create a task group performing sets of actions for each file. This is the only way to dynamically map sequential tasks in Airflow.

Define task groups

There are two ways to define task groups in your DAGs:

  • Use the TaskGroup class to create a task group context.
  • Use the @task_group decorator on a Python function.

In most cases, it is a matter of personal preference which method you use. The only exception is when you want to dynamically map over a task group; this is possible only when using @task_group.

The following code shows how to instantiate a simple task group containing two sequential tasks. You can use dependency operators (<< and >>) both within and between task groups in the same way that you can with individual tasks.

# from airflow.decorators import task_group

t0 = EmptyOperator(task_id='start')

# Start task group definition
@task_group(group_id='my_task_group')
def tg1():
t1 = EmptyOperator(task_id='task_1')
t2 = EmptyOperator(task_id='task_2')

t1 >> t2
# End task group definition

t3 = EmptyOperator(task_id='end')

# Set task group's (tg1) dependencies
t0 >> tg1() >> t3

In the Grid View of the Airflow UI, task groups have a note showing how many tasks they contain. There are three ways to expand or collapse task groups:

  • Click on the note (for example +2 tasks).
  • Click the buttons on top of the task list.
  • Click the arrow next to names of task groups in the task list.

See the following GIF for examples of each of these options:

Task groups simple example

In Airflow 2.7, task groups can be cleared and marked as success/failed just like individual tasks.

Task groups mark success/failed

Task group parameters

You can use parameters to customize individual task groups. The two most important parameters are the group_id which determines the name of your task group, as well as the default_args which will be passed to all tasks in the task group. The following examples show task groups with some commonly configured parameters:

@task_group(
group_id="task_group_1",
default_args={"conn_id": "postgres_default"},
tooltip="This task group is very important!",
prefix_group_id=True,
# parent_group=None,
# dag=None,
)
def tg1():
t1 = EmptyOperator(task_id="t1")

tg1()

In older Airflow versions using the old Graph view you can change the background and font color of the task group with the ui_color and ui_fgcolor parameters.

task_id in task groups

When your task is within a task group, your callable task_id will be group_id.task_id. This ensures the task_id is unique across the DAG. It is important that you use this format when referring to specific tasks when working with XComs or branching. You can disable this behavior by setting the task group parameter prefix_group_id=False.

For example, the task_1 task in the following DAG has a task_id of my_outer_task_group.my_inner_task_group.task_1.

@task_group(group_id="my_outer_task_group")
def my_outer_task_group():
@task_group(group_id="my_inner_task_group")
def my_inner_task_group():
EmptyOperator(task_id="task_1")

my_inner_task_group()

my_outer_task_group()

Passing data through task groups

When you use the @task_group decorator, you can pass data through the task group just like with regular @task decorators:

from airflow.decorators import dag, task, task_group
from pendulum import datetime
import json


@dag(start_date=datetime(2023, 8, 1), schedule=None, catchup=False)
def task_group_example():
@task
def extract_data():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict

@task
def transform_sum(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value

return {"total_order_value": total_order_value}

@task
def transform_avg(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
avg_order_value = total_order_value / len(order_data_dict)

return {"avg_order_value": avg_order_value}

@task_group
def transform_values(order_data_dict):
return {
"avg": transform_avg(order_data_dict),
"total": transform_sum(order_data_dict),
}

@task
def load(order_values: dict):
print(
f"""Total order value is: {order_values['total']['total_order_value']:.2f}
and average order value is: {order_values['avg']['avg_order_value']:.2f}"""
)

load(transform_values(extract_data()))


task_group_example()

The resulting DAG looks is shown in the following image:

Decorated task group

There are a few things to consider when passing information into and out of task groups:

  • If downstream tasks require the output of tasks that are in the task group decorator, then the task group function must return a result. In the previous example, a dictionary with two values was returned, one from each of the tasks in the task group, that are then passed to the downstream load() task.
  • If your task group function returns an output that another task takes as an input, Airflow can infer the task group and task dependency with the TaskFlow API. If your task group function's output isn't used as a task input, you must use the bit-shift operators (<< or >>) to define downstream dependencies to the task group.

Generate task groups dynamically at runtime

You can use dynamic task mapping with the @task_group decorator to dynamically map over task groups. The following DAG shows how you can dynamically map over a task group with different inputs for a given parameter.

from airflow.decorators import dag, task_group, task
from pendulum import datetime


@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
)
def task_group_mapping_example():
# creating a task group using the decorator with the dynamic input my_num
@task_group(group_id="group1")
def tg1(my_num):
@task
def print_num(num):
return num

@task
def add_42(num):
return num + 42

print_num(my_num) >> add_42(my_num)

# a downstream task to print out resulting XComs
@task
def pull_xcom(**context):
pulled_xcom = context["ti"].xcom_pull(
# reference a task in a task group with task_group_id.task_id
task_ids=["group1.add_42"],
# only pull Xcom from specific mapped task group instances (2.5 feature)
map_indexes=[2, 3],
key="return_value",
)

# will print out a list of results from map index 2 and 3 of the add_42 task
print(pulled_xcom)

# creating 6 mapped task group instances of the task group group1 (2.5 feature)
tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])

# setting dependencies
tg1_object >> pull_xcom()


task_group_mapping_example()

This DAG dynamically maps over the task group group1 with different inputs for the my_num parameter. 6 mapped task group instances are created, one for each input. Within each mapped task group instance two tasks will run using that instances' value for my_num as an input. The pull_xcom() task downstream of the dynamically mapped task group shows how to access a specific XCom value from a list of mapped task group instances (map_indexes).

For more information on dynamic task mapping, including how to map over multiple parameters, see Dynamic Tasks.

Order task groups

By default, using a loop to generate your task groups will put them in parallel. If your task groups are dependent on elements of another task group, you'll want to run them sequentially. For example, when loading tables with foreign keys, your primary table records need to exist before you can load your foreign table.

In the following example, the third task group generated in the loop has a foreign key constraint on both previously generated task groups (first and second iteration of the loop), so you'll want to process it last. To do this, you'll create an empty list and append your task group objects as they are generated. Using this list, you can reference the task groups and define their dependencies to each other:

groups = []
for g_id in range(1,4):
tg_id = f"group{g_id}"

@task_group(group_id=tg_id)
def tg1():
t1 = EmptyOperator(task_id="task1")
t2 = EmptyOperator(task_id="task2")

t1 >> t2

if tg_id == "group1":
t3 = EmptyOperator(task_id="task3")
t1 >> t3

groups.append(tg1())

[groups[0] , groups[1]] >> groups[2]

The following image shows how these task groups appear in the Airflow UI:

Task group Dependencies

This example also shows how to add an additional task to group1 based on your group_id, Even when you're creating task groups in a loop to take advantage of patterns, you can still introduce variations to the pattern while avoiding code redundancies.

Nest task groups

For additional complexity, you can nest task groups by defining a task group indented within another task group. There is no limit to how many levels of nesting you can have.

groups = []
for g_id in range(1,3):
@task_group(group_id=f"group{g_id}")
def tg1():
t1 = EmptyOperator(task_id="task1")
t2 = EmptyOperator(task_id="task2")

sub_groups = []
for s_id in range(1,3):
@task_group(group_id=f"sub_group{s_id}")
def tg2():
st1 = EmptyOperator(task_id="task1")
st2 = EmptyOperator(task_id="task2")

st1 >> st2
sub_groups.append(tg2())

t1 >> sub_groups >> t2
groups.append(tg1())

groups[0] >> groups[1]

The following image shows the expanded view of the nested task groups in the Airflow UI:

Nested task groups

Custom task group classes

If you use the same patterns of tasks in several DAGs or Airflow instances, it may be useful to create a custom task group class module. To do so, you need to inherit from the TaskGroup class and then define your tasks within that custom class. You also need to use self to assign the task to the task group. Other than that, the task definitions will be the same as if you were defining them in a DAG file.

from airflow.utils.task_group import TaskGroup
from airflow.decorators import task


class MyCustomMathTaskGroup(TaskGroup):
"""A task group summing two numbers and multiplying the result with 23."""

# defining defaults of input arguments num1 and num2
def __init__(self, group_id, num1=0, num2=0, tooltip="Math!", **kwargs):
"""Instantiate a MyCustomMathTaskGroup."""
super().__init__(group_id=group_id, tooltip=tooltip, **kwargs)

# assing the task to the task group by using `self`
@task(task_group=self)
def task_1(num1, num2):
"""Adds two numbers."""
return num1 + num2

@task(task_group=self)
def task_2(num):
"""Multiplies a number by 23."""
return num * 23

# define dependencies
task_2(task_1(num1, num2))

In the DAG, you import your custom TaskGroup class and instantiate it with the values for your custom arguments:

from airflow.decorators import dag, task
from pendulum import datetime
from include.custom_task_group import MyCustomMathTaskGroup


@dag(
start_date=datetime(2023, 8, 1),
schedule=None,
catchup=False,
tags=["@task_group", "task_group"],
)
def custom_tg():
@task
def get_num_1():
return 5

tg1 = MyCustomMathTaskGroup(group_id="my_task_group", num1=get_num_1(), num2=19)

@task
def downstream_task():
return "hello"

tg1 >> downstream_task()


custom_tg()

The resulting image shows the custom templated task group which can now be reused in other DAGs with different inputs for num1 and num2.

Custom task group

Was this page helpful?