Use DAG Factory to create DAGs
The DAG Factory is an open source tool managed by Astronomer that allows you to dynamically generate Apache Airflow® DAGs from YAML. While Airflow DAGs are traditionally written exclusively in Python, the DAG Factory makes it easy for people who don't know Python to use Airflow.
This guide includes instructions for installing the DAG Factory package into your Astro project and a sample YAML configuration file that you can use to easily specify the details of your DAG, including its schedule, callbacks, and task names.
The DAG Factory can be used with all Astronomer products and any Apache Airflow installation. To view the source code of the project, see DAG Factory.
Prerequisites
- Python version 3.8.0 or greater
- Apache Airflow version 2.0 or greater
If you're an Astronomer customer, you must have an Astro project with a supported version of Astro Runtime. See Astro Runtime lifecycle schedule for a list of currently supported Runtime versions.
Step 1: Install DAG Factory
To use the DAG Factory, install it as a Python package into your Apache Airflow environment.
If you're an Astronomer customer:
- In your Astro project, open your
requirements.txt
file. - Add
dag-factory<=1.0.0
to the file. - Save the changes to your
requirements.txt
file.
If you're not an Astronomer customer, install the Python package according to the standards at your organization:
pip install dag-factory<=1.0.0
Step 2: Create a sub-directory for your DAG configuration files and create a YAML file for your DAG
- In the
dags
directory of your Astro project, create a new sub-directory calledconfigs
to store your DAG configuration files defined in YAML. Astronomer recommends keeping these separate from standard DAG files written in Python. - Within the new sub-directory, create a new YAML file for your DAG called for example
my_dag.yaml
. Copy the contents of the following example configuration file into the YAML file.
<your-DAG-id>:
default_args:
owner: 'example_owner'
start_date: 2024-07-01 # or '2 days'
end_date: 2030-01-01
retries: 1
retry_delay_sec: 300
schedule_interval: '0 3 * * *'
catchup: False
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 60
default_view: 'grid' # or 'graph', 'duration', 'gantt', 'landing_times' (for Run Duration in newer versions)
orientation: 'LR' # or 'TB', 'RL', 'BT'
description: 'This is an example dag!'
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 1'
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 2'
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 3'
dependencies: [task_1]
-
Modify the example configuration file with parameters for the DAG you want to create, including replacing
<your-DAG-id>
with a validdag_id
. See DAG-level parameters in Airflow to learn more about each parameter. -
(Optional) Delete any configurations that you don't want to specify. For parameters that aren't specified, your DAG will assume the default values that correspond with your current Apache Airflow or Astro Runtime version.
Step 3: Create a DAG Factory file
All YAML files in your dags
directory must be parsed and converted into Python in order to run on Apache Airflow. In this step, you will create a new DAG Factory file in your Astro project that includes the conversion logic. You only need to do this once and do not need a separate DAG Factory file for each of your DAGs or YAML files.
- In the
dags
directory of your Astro project, create a new Python file calleddag_factory.py
. - Copy the following contents into your empty Python file. This file represents an Apache Airflow DAG and includes two commands that convert each of your YAML file(s) into DAGs.
from pathlib import Path
from airflow import DAG
from airflow.configuration import conf as airflow_conf
from dagfactory import load_yaml_dags
config_dir = Path(airflow_conf.get("core", "dags_folder")) / "configs"
load_yaml_dags(globals_dict=globals(), dags_folder=config_dir)
Step 4: (Optional) Add a DAG-level callback
In order to use DAG-level callbacks you will need to add callback parameters to your config file. The values will be the paths to the files that contain your callback functions, as well as the callback function names.
In my_dag.yml
, add the following parameters:
on_success_callback_name: placeholder_callback
on_success_callback_file: /usr/local/airflow/dags/callback_func.py
on_failure_callback_name: placeholder_callback
on_failure_callback_file: /usr/local/airflow/dags/callback_func.py
- Create a new file
callback_func.py
in yourdags
directory. - Copy the contents of the following placeholder callback into the file:
def placeholder_callback():
pass
- Save the file.
Generating YAML files dynamically
The example above shows how to use the DAG factory to create DAGs based on static YAML files. For use cases where you'd like to create several DAGs with a similar structure it is possible to create them dynamically based on a template YAML file to avoid code duplication. Creating a DAG dynamically with the DAG factory simply means that you use Python code to create the YAML configurations instead of writing them manually.
There are two files that you need:
- A template YAML file that contains the structure of the DAGs you want to create with placeholders for the values that will change.
- A Python script that creates the DAG Factory YAML file by replacing the placeholders in the template YAML file with the actual values.
The template YAML file provides the structure for all the DAGs you will generate dynamically with placeholders for values that vary in between the DAGs:
Click to view an example template YAML
<< dag_id >>:
default_args:
owner: 'example_owner'
start_date: "2024-11-01"
retries: << default_args_retries >>
retry_delay_sec: 300
schedule_interval: "<< schedule >>"
catchup: False
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "<< bash_command_task_1 >>"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "<< bash_command_task_2 >>"
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "<< bash_command_task_3 >>"
dependencies: [task_1]
The Python script reads the template YAML file, replaces the placeholders with the actual values, and writes the resulting YAML files to the dags
directory. You can run this script manually to generate your DAGs for local development or automatically as part of your CI/CD pipeline.
Click to view the example Python script
"""
generate_dynamic_dag.py
Script that is run to generate a .yml file to be used to source a dag-factory DAG.
"""
import json
import yaml
# provide the path to your template YAML file here
PATH_TO_TEMPLATE_YAML: str = "include/template.yml"
# read the template dag-factory YAML file
with open(PATH_TO_TEMPLATE_YAML, "r") as yaml_file:
template: dict = yaml.load(yaml_file, yaml.SafeLoader)
# replace the placeholders with the actual values,
# note that << >> is used to identify placeholders here, this is arbitrary, you
# can use replace any other character or sequence of characters
TEMPLATE_VARIABLES: list = [
{
"<< dag_id >>": "business_analytics",
"<< default_args_retries >>": 1,
"<< schedule >>": "@daily",
"<< bash_command_task_1 >>": "echo 'Task 1: Business Analytics'",
"<< bash_command_task_2 >>": "echo 'Task 2: Business Analytics'",
"<< bash_command_task_3 >>": "echo 'Task 3: Business Analytics'",
},
{
"<< dag_id >>": "data_science",
"<< default_args_retries >>": 2,
"<< schedule >>": "@weekly",
"<< bash_command_task_1 >>": "echo 'Task 1: Data Science'",
"<< bash_command_task_2 >>": "echo 'Task 2: Data Science'",
"<< bash_command_task_3 >>": "echo 'Task 3: Data Science'",
},
{
"<< dag_id >>": "machine_learning",
"<< default_args_retries >>": 3,
"<< schedule >>": "0 1 * * *",
"<< bash_command_task_1 >>": "echo 'Task 1: Machine Learning'",
"<< bash_command_task_2 >>": "echo 'Task 2: Machine Learning'",
"<< bash_command_task_3 >>": "echo 'Task 3: Machine Learning'",
},
]
populated_templates: dict = {}
# Loop through the list of template dictionaries and replace the placeholders
# and creating a JSON object that contains the DAG definitions
for variables in TEMPLATE_VARIABLES:
populated_template: dict = template.copy()
populated_template__str: str = json.dumps(populated_template)
for key, value in variables.items():
populated_template__str = populated_template__str.replace(key, str(value))
populated_template = json.loads(populated_template__str)
populated_template__keys = list(populated_template.keys())
# check that each DAG is associated with only one dag_id
if len(populated_template__keys) == 1:
# assign the DAG id as the key in the JSON object
populated_templates[populated_template__keys[0]] = populated_template[
populated_template__keys[0]
]
else:
raise Exception("Unexpected format from populated template.")
# write the JSON object to a YAML file this creates one file containing
# the DAG factory definitions for all DAGs created from the templates
with open("dags/dynamic_etl.yml", "w") as ingestion__config:
ingestion__config.write(yaml.dump(populated_templates))