Run the Kubernetes executor on Astronomer Software
The Kubernetes Executor creates individual Pods that dynamically delegate work and resources to individual tasks. For each task that needs to run, the executor works with the Kubernetes API and dynamically launches Pods which terminate when the task is completed.
You can customize your Kubernetes Pods to scale depending on how many Airflow tasks you're running at a given time. It also means you can configure the following for each individual Airflow task:
- Memory allocation
- Service accounts
- Airflow image
To configure these resources for a given task's Pod, you specify a pod_override
in your DAG code. To specify a Pod template for many or all of your tasks, you can write a helper function to construct a pod_override
in your DAGs or configure a global setting. For more information on configuring Pod template values, reference the Kubernetes documentation.
Prerequisites
You must have an Airflow Deployment on Astronomer running with the Kubernetes executor. For more information on configuring an executor, see Configure a Deployment. To learn more about different executor types, see Airflow executors explained.
Configure the default worker Pod for all Deployments
By default, the Kubernetes executor launches workers based on a podTemplate
configuration in the Astronomer Airflow Helm chart.
You can modify the default podTemplate
to configure the default worker Pods for all Deployments using the Kubernetes executor on your Astronomer Software installation. You can then override this default at the task level using a pod_override
file. See Configure the worker Pod for a specific task.
-
In your
values.yaml
file, copy the completepodTemplate
configuration from your version of the Astronomer Airflow Helm chart. Your file should look like the following:astronomer:
houston:
config:
deployments:
helm:
airflow:
podTemplate: |
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
{{- $nodeSelector := or .Values.nodeSelector .Values.workers.nodeSelector }}
{{- $affinity := or .Values.affinity .Values.workers.affinity }}
{{- $tolerations := or .Values.tolerations .Values.workers.tolerations }}
apiVersion: v1
kind: Pod
metadata:
name: astronomer-pod-template-file
labels:
tier: airflow
component: worker
release: {{ .Release.Name }}
{{- with .Values.labels }}
{{ toYaml . | indent 4 }}
{{- end }}
{{- if .Values.airflowPodAnnotations }}
annotations:
{{- toYaml .Values.airflowPodAnnotations | nindent 4 }}
{{- end }}
spec:
{{- if or (and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled)) .Values.workers.extraInitContainers }}
initContainers:
{{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}
{{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | nindent 4 }}
{{- end }}
{{- if .Values.workers.extraInitContainers }}
{{- toYaml .Values.workers.extraInitContainers | nindent 4 }}
{{- end }}
{{- end }}
containers:
- args: []
command: []
envFrom:
{{- include "custom_airflow_environment_from" . | default "\n []" | indent 6 }}
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
{{- include "standard_airflow_environment" . | indent 6}}
{{- include "custom_airflow_environment" . | indent 6 }}
image: {{ template "pod_template_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
name: base
ports: []
volumeMounts:
- mountPath: {{ template "airflow_logs" . }}
name: logs
- name: config
mountPath: {{ template "airflow_config_path" . }}
subPath: airflow.cfg
readOnly: true
{{- if .Values.airflowLocalSettings }}
- name: config
mountPath: {{ template "airflow_local_setting_path" . }}
subPath: airflow_local_settings.py
readOnly: true
{{- end }}
{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}
{{- include "airflow_dags_mount" . | nindent 8 }}
{{- end }}
{{- if .Values.workers.extraVolumeMounts }}
{{ toYaml .Values.workers.extraVolumeMounts | indent 8 }}
{{- end }}
{{- if .Values.workers.extraContainers }}
{{- toYaml .Values.workers.extraContainers | nindent 4 }}
{{- end }}
hostNetwork: false
{{- if or .Values.registry.secretName .Values.registry.connection }}
imagePullSecrets:
- name: {{ template "registry_secret" . }}
{{- end }}
restartPolicy: Never
securityContext:
runAsUser: {{ .Values.uid }}
fsGroup: {{ .Values.gid }}
nodeSelector: {{ toYaml $nodeSelector | nindent 4 }}
affinity: {{ toYaml $affinity | nindent 4 }}
tolerations: {{ toYaml $tolerations | nindent 4 }}
serviceAccountName: {{ include "worker.serviceAccountName" . }}
volumes:
{{- if .Values.dags.persistence.enabled }}
- name: dags
persistentVolumeClaim:
claimName: {{ template "airflow_dags_volume_claim" . }}
{{- else if .Values.dags.gitSync.enabled }}
- name: dags
emptyDir: {}
{{- end }}
{{- if .Values.logs.persistence.enabled }}
- name: logs
persistentVolumeClaim:
claimName: {{ template "airflow_logs_volume_claim" . }}
{{- else }}
- emptyDir: {}
name: logs
{{- end }}
{{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
{{- include "git_sync_ssh_key_volume" . | nindent 2 }}
{{- end }}
- configMap:
name: {{ include "airflow_config" . }}
name: config
{{- if .Values.workers.extraVolumes }}
{{ toYaml .Values.workers.extraVolumes | nindent 2 }}
{{- end }} -
Customize the pod template configuration based on your use case, such as by requesting default limits on CPU and memory usage. To configure these resources for each Pod, you configure a Pod template. For more information on configuring Pod template values, see the Kubernetes documentation.
-
Push the configuration change to your platform. See Apply a config change.
Configure the worker Pod for a specific task
For each task with the Kubernetes executor, you can customize its individual worker Pod and override the defaults used in Astronomer Software by configuring a pod_override
file.
-
Add the following import to your DAG file:
from kubernetes.client import models as k8s
-
Add a
pod_override
configuration to the DAG file containing the task. See thekubernetes-client
GitHub for a list of all possible settings you can include in the configuration. -
Specify the
pod_override
in the task's parameters.
Example: Set CPU or memory limits and requests
One of the most common use cases for customizing a Kubernetes worker Pod is to request a specific amount of resources for a task.
The following example shows how you can use a pod_override
configuration in your DAG code to request custom resources for a task:
import pendulum
import time
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.example_dags.libs.helper import print_stuff
from kubernetes.client import models as k8s
k8s_exec_config_resource_requirements = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={"cpu": 0.5, "memory": "1024Mi"},
limits={"cpu": 0.5, "memory": "1024Mi"}
)
)
]
)
)
}
with DAG(
dag_id="example_kubernetes_executor_pod_override_sources",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False
):
BashOperator(
task_id="bash_resource_requirements_override_example",
bash_command="echo hi",
executor_config=k8s_exec_config_resource_requirements
)
@task(executor_config=k8s_exec_config_resource_requirements)
def resource_requirements_override_example():
print_stuff()
time.sleep(60)
resource_requirements_override_example()
When this DAG runs, it launches a Kubernetes Pod with exactly 0.5m of CPU and 1024Mi of memory, as long as that infrastructure is available in your cluster. Once the task finishes, the Pod terminates gracefully.
Mount secret environment variables to worker Pods
Deployment environment variables marked as secrets are stored in a Kubernetes secret called <release-name>-env
on your Deployment namespace. To use a secret value in a task running on the KubernetesExecutor, mount the secret to the Pod running the task.
-
Run the following command to find the namespace (release name) of your Airflow Deployment:
kubectl get ns
-
Add the following import to your DAG file:
from airflow.kubernetes.secret import Secret
-
Define a Kubernetes
Secret
in your DAG instantiation using the following format:secret_env = Secret(deploy_type="env", deploy_target="<SECRET_KEY>", secret="<release-name>-env", key="<SECRET_KEY>")
namespace = conf.get("kubernetes", "<release-name>") -
Specify the
Secret
in thesecret_key_ref
section of yourpod_override
configuration. -
In the task where you want to use the secret value, add the following task-level argument:
op_kwargs={
"env_name": secret_env.deploy_target
}, -
In the executable for the task, call the secret value using
os.environ[env_name]
.
In the following example, a secret named MY_SECRET
is pulled from infrared-photon-7780-env
and printed to logs.
import pendulum
from kubernetes.client import models as k8s
from airflow.configuration import conf
from airflow.kubernetes.secret import Secret
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.python import PythonOperator
def print_env(env_name):
import os
print(os.environ[env_name])
with DAG(
dag_id='test-secret',
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
end_date=pendulum.datetime(2022, 1, 5, tz="UTC"),
schedule_interval="@once",
) as dag:
secret_env = Secret(deploy_type="env", deploy_target="MY_SECRET", secret="infrared-photon-7780-env", key="MY_SECRET")
namespace = conf.get("kubernetes", "infrared-photon-7780")
p = PythonOperator(
python_callable=print_env,
op_kwargs={
"env_name": secret_env.deploy_target
},
task_id='test-py-env',
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
env=[
k8s.V1EnvVar(
name=secret_env.deploy_target,
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(name=secret_env.secret,
key=secret_env.key)
),
)
],
)
]
)
),
}
)