Configure worker queues
By default, all tasks using the Celery executor run in a default
worker queue. You can create additional worker queues to enable multiple worker types or configurations for different groups of tasks, and assign tasks to these queues in your DAG code. For more information about Airflow executors on Astro, see Manage Airflow executors on Astro.
Use worker queues to create optimized execution environments for different types of tasks in the same Deployment. You can use worker queues to:
- Separate resource-intensive tasks, such as those that execute machine learning models, from tasks that require minimal resources, such as those that execute SQL queries.
- Separate short-running tasks from long-running tasks.
- Isolate a single task from other tasks in your Deployment.
- Allow some workers to scale to zero but keep a minimum of 1 for other types of workers.
Benefits
Worker queues can enhance performance, decrease cost, and increase the reliability of task execution in your Deployment. Specifically:
- Executing a task with dedicated hardware that best fits the needs of that task can result in faster performance. In some cases, this can decrease the duration of a DAG by up to 50%.
- Paying for larger workers only for select tasks means that you can lower your infrastructure cost by not paying for that worker when your tasks don't need it.
- Separating tasks that have different characteristics often means they're less likely to result in a failed or zombie state.
Example
By configuring multiple worker queues and assigning tasks to these queues based on the requirements of the tasks, you can enhance the performance, reliability, and throughput of your Deployment. For example, consider the following scenario:
- You are running Task A and Task B in a Deployment.
- Task A and Task B are dependent on each other, so they need to run in the same Deployment.
- Task A is a long-running task that uses a lot of CPU and memory, while Task B is a short-running task that uses minimal amounts of CPU and memory.
You can assign Task A to a worker queue that is configured to use the A20 worker type, which is optimized for running compute-heavy tasks. Then, you can assign Task B to a worker queue that is configured to use the A5 worker type, which is smaller and optimized for general usage.
Default worker queue
Each Deployment requires a worker queue named default
to run tasks. Tasks that are not assigned to a worker queue in your DAG code are executed by workers in the default
worker queue.
You can change all settings of the default worker queue except for its name.
Worker queue settings
You can configure each worker queue on Astro with the following settings:
- Name: The name of your worker queue. Use this name to assign tasks to the worker queue in your DAG code. Worker queue names must consist only of lowercase letters and hyphens. For example,
machine-learning-tasks
orshort-running-tasks
orhigh-cpu
. - Concurrency: The maximum number of tasks that a single worker can run at a time. If the number of queued and running tasks exceeds this number, a new worker is added to run the remaining tasks. This value is equivalent to worker concurrency in Apache Airflow. The default for your worker type is suitable for most use cases.
- Min # Workers / Max # Workers: The minimum and maximum number of workers that can run at a time. The number of workers autoscales based on Concurrency and the current number of tasks in a
queued
orrunning
state. - Worker Type: The size of workers in the worker queue, for example A5 or A20. A worker's total available CPU, memory, and storage is defined by its worker size. For more information, see the following section on hosted worker types.
Hosted worker types
Each Deployment worker queue has a worker type that determines how many resources are available to your Airflow workers for running tasks. A worker type is a virtualized instance of CPU and memory on your cluster that is specific to the Astro platform. The underlying node instance type running your worker can vary based on how Astro optimizes resource usage on your cluster.
Each virtualized instance of your worker type is a worker. Celery workers can run multiple tasks at once, while Kubernetes workers only scale up and down to run a single task at a time.
The following table lists all available worker types on Astro Deployments.
Worker Type | vCPU | Memory | Ephemeral storage | Default task concurrency | Max task concurrency |
---|---|---|---|---|---|
A5 | 1 | 2GiB | 10 GiB | 5 | 15 |
A10 | 2 | 4GiB | 10 GiB | 10 | 30 |
A20 | 4 | 8GiB | 10 GiB | 20 | 60 |
A40 | 8 | 16GiB | 10 GiB | 40 | 120 |
A60 | 12 | 24GiB | 10 GiB | 60 | 180 |
A120 | 24 | 48GiB | 10 GiB | 120 | 360 |
A160 | 32 | 64GiB | 10 GiB | 160 | 480 |
All worker types additionally have 10 GiB of ephemeral storage that your tasks can use when storing small amounts of data within the worker.
Astro Hybrid worker type setup
On Astro Hybrid clusters, worker type is defined as a node instance type that is supported by the cloud provider of your cluster. For example, a worker type might be m5.2xlarge
or c6i.4xlarge
for a Deployment running on a Hybrid AWS cluster hosted on your cloud. Actual worker size is equivalent to the total capacity of the worker type minus Astro’s system overhead.
Your Organization can enable up to 10 additional different worker types for each Hybrid cluster. After a worker type is enabled on an Astro Hybrid cluster, the worker type becomes available to any Deployment in that cluster and appears in the Worker Type menu of the Astro UI.
-
Review the list of supported worker types for your cloud provider. See AWS, Azure, or GCP resource references.
-
Contact Astronomer support and provide the following information:
- The name of your cluster.
- The name of the worker type(s) you want to enable for your cluster. For example,
m6i.2xlarge
.
For more information on requesting cluster changes, see Manage Hybrid clusters.
Create a worker queue
If you prefer, you can also run the astro deployment worker-queue create
command in the Astro CLI to create a worker queue. See the CLI Command Reference.
-
In the Astro UI, select a Workspace, click Deployments, and then select a Deployment.
-
Click the Options menu of the Deployment you want to update, and select Edit Deployment.
-
In the Execution section, click Add Queue to create a new worker queue, and then configure its related attributes. Note that you can't change the name of a worker queue after you create it.
-
When you're done creating your worker queue, click Update Deployment to save your changes.
You can create, update, and delete multiple worker queues at once using a Deployment file. See Deployments as Code.
Assign tasks to a worker queue
By default, all tasks run in the default worker queue. To run tasks on a different worker queue, assign the task to the worker queue in your DAG code.
Step 1: Copy the name of the worker queue
- In the Astro UI, select a Workspace and select a Deployment.
- Click the Worker Queues tab.
- Copy the name of the worker queue name you want to assign a task to.
Step 2: Assign the task in your DAG code
In your DAG code, add a queue='<worker-queue-name>'
argument to the definition of the task. If a task is assigned to a queue that does not exist or is not referenced properly, the task might remain in a queued
state and fail to execute. Make sure that the name of the queue in your DAG code matches the name of the queue in the Astro UI.
Astronomer recommends using Apache Airflow's Taskflow API to define your task argument. The Taskflow API is feature in Airflow 2 that includes a task decorator and makes DAGs easier to write. In the following examples, all instances of the task will run in the machine-learning-tasks
queue. Choose an example based on whether or not you use the Taskflow API.
- Classic Operator Example
- TaskFlow API Example
train_model = PythonOperator(
task_id='train_model',
python_callable=train_model_flights
queue='machine-learning-tasks'
)
@task(task_id='train_model', queue='machine-learning-tasks')
def train_model_flights(x_train, y_train):
import xgboost
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
xgbclf = xgboost.XGBClassifier()
pipe = Pipeline([('scaler', StandardScaler(with_mean=False)),
('xgbclf', xgbclf)])
pipe.fit(x_train, y_train)
return pipe
Update a worker queue
If you prefer, you can run the astro deployment worker-queue update
command in the Astro CLI to update a worker queue. See the CLI Command Reference.
You can update worker queue configurations at any time. The worker queue name can't be changed.
If you need to change the worker type of an existing worker queue, Astronomer recommends making the change at a time when it will not affect production pipelines. After you've changed a worker type, Astronomer recommends waiting a minimum of five minutes before pushing new code to your Deployment.
-
In the Astro UI, select a Workspace, click Deployments, and then select a Deployment.
-
Click the Options menu of the Deployment you want to update, and select Edit Deployment.
-
Expand the Execution section if it is not already expanded.
-
Update the worker queue settings, and then click Update Deployment.
The Airflow components of your Deployment automatically restart to apply the updated resource allocations. This action is equivalent to deploying code to your Deployment and does not impact running tasks that have 24 hours to complete before running workers are terminated. See What happens during a code deploy.
If you see tasks getting stuck, it might be because the worker queue configuration doesn't align with the Airflow Pools, a component that allows you to control execution parallelism. Make sure to update your Airflow Pools to match the changes in potential maximum task parallelism caused by changes to the worker queue.
For more information on limited parallelism, see Airflow Pools.
Delete a worker queue
If you prefer, you can also run the astro deployment worker-queue delete
command in the Astro CLI to delete a worker queue. See the CLI Command Reference.
All scheduled tasks that are assigned to a worker queue after the worker queue is deleted remain in a queued
state indefinitely and won't execute. To avoid stalled task runs, ensure that you reassign all tasks from a worker queue before deleting it. You can either remove the worker queue argument or assign the task to a different queue.
-
In the Astro UI, select a Workspace, click Deployments, and then select a Deployment.
-
Click the Options menu of the Deployment you want to update, and select Edit Deployment.
-
Expand the Execution section if it is not already expanded.
-
Click Remove queue to delete the worker queue, and then click Update Deployment.