Orchestrate Ray jobs with Apache Airflow®
Ray is an open-source framework for scaling Python applications, particularly for machine learning and AI workloads where it provides the layer for parallel processing and distributed computing. Many large language models (LLMs) are trained using Ray, including OpenAI's GPT models.
The Ray provider package for Apache Airflow® allows you to interact with Ray from your Airflow DAGs. This tutorial demonstrates how to use the Ray provider package to orchestrate a simple Ray job with Airflow in an existing Ray cluster. For more in-depth information, see the Ray provider documentation.
For instructions on how to run Ray jobs on the Anyscale platform with Airflow, see the Orchestrate Ray jobs on Anyscale with Apache Airflow® tutorial.
This tutorial shows a simple implementation of the Ray provider package. For a more complex example, see the Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale reference architecture.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- Ray basics. See the Getting Started section of the Ray documentation.
- Airflow operators. See Airflow operators.
Prerequisites
- The Astro CLI.
- A pre-existing Ray cluster. This tutorial uses a pre-existing local Ray cluster set up using kind as shown in the RayCluster Quickstart.
The Ray provider package can also create a Ray cluster for you in an existing Kubernetes cluster. For more information, see the Ray provider package documentation. Note that you need a Kubernetes cluster with a pre-configured LoadBalancer service to use the Ray provider package.
Step 1: Configure your Astro project
Use the Astro CLI to create and run an Airflow project on your local machine.
-
Create a new Astro project:
$ mkdir astro-ray-tutorial && cd astro-ray-tutorial
$ astro dev init -
In the requirements.txt file, add the Ray provider.
astro-provider-ray==0.2.1
-
Run the following command to start your Airflow project:
astro dev start
Step 2: Configure a Ray connection
For Astro customers, Astronomer recommends to take advantage of the Astro Environment Manager to store connections in an Astro-managed secrets backend. These connections can be shared across multiple deployed and local Airflow environments. See Manage Astro connections in branch-based deploy workflows.
-
In the Airflow UI, go to Admin -> Connections and click +.
-
Create a new connection and choose the
Ray
connection type. Enter the following information:- Connection ID:
ray_conn
- Ray dashboard url: Your Ray dashboard URL, for example
http://kind-control-plane//:8265
. - For this local tutorial, click the
Disable SSL
checkbox.
- Connection ID:
-
Click Save.
If you are connecting to a Ray cluster running on a cloud provider, you need to provide the .kubeconfig
file of the Kubernetes cluster where the Ray cluster is running as Kube config (JSON format)
, as well as valid Cloud credentials as environment variables.
Step 3: Write a DAG to orchestrate Ray jobs
-
Create a new file in your
dags
directory calledray_tutorial.py
. -
Copy and paste the code below into the file:
- TaskFlow
- Traditional syntax
"""
## Ray Tutorial
This tutorial demonstrates how to use the Ray provider in Airflow to parallelize
a task using Ray.
"""
from airflow.decorators import dag, task
from ray_provider.decorators.ray import ray
CONN_ID = "ray_conn"
RAY_TASK_CONFIG = {
"conn_id": CONN_ID,
"num_cpus": 1,
"num_gpus": 0,
"memory": 0,
"poll_interval": 5,
}
@dag(
start_date=None,
schedule=None,
catchup=False,
tags=["ray", "example"],
doc_md=__doc__,
)
def test_taskflow():
@task
def generate_data() -> list:
"""
Generate sample data
Returns:
list: List of integers
"""
import random
return [random.randint(1, 100) for _ in range(10)]
# use the @ray.task decorator to parallelize the task
@ray.task(config=RAY_TASK_CONFIG)
def get_mean_squared_value(data: list) -> float:
"""
Get the mean squared value from a list of integers
Args:
data (list): List of integers
Returns:
float: Mean value of the list
"""
import numpy as np
import ray
@ray.remote
def square(x: int) -> int:
"""
Square a number
Args:
x (int): Number to square
Returns:
int: Squared number
"""
return x**2
ray.init()
data = np.array(data)
futures = [square.remote(x) for x in data]
results = ray.get(futures)
mean = np.mean(results)
print(f"Mean squared value: {mean}")
data = generate_data()
get_mean_squared_value(data)
test_taskflow()"""
## Ray Tutorial
This tutorial demonstrates how to use the Ray provider in Airflow to
parallelize a task using Ray.
"""
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from ray_provider.operators.ray import SubmitRayJob
from airflow.models.baseoperator import chain
from pathlib import Path
CONN_ID = "ray_conn"
FOLDER_PATH = Path(__file__).parent
RAY_RUNTIME_ENV = {"working_dir": str(FOLDER_PATH)}
def _generate_data() -> list:
"""
Generate sample data
Returns:
list: List of integers
"""
import random
return [random.randint(1, 100) for _ in range(10)]
@dag(
start_date=None,
schedule=None,
catchup=False,
tags=["ray", "example"],
doc_md=__doc__,
)
def ray_tutorial():
data = PythonOperator(
task_id="generate_data",
python_callable=_generate_data,
)
get_mean_squared_value = SubmitRayJob(
task_id="SubmitRayJob",
conn_id=CONN_ID,
entrypoint="python ray_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}",
runtime_env=RAY_RUNTIME_ENV,
num_cpus=1,
num_gpus=0,
memory=0,
resources={},
xcom_task_key="SubmitRayJob.dashboard",
fetch_logs=True,
wait_for_completion=True,
job_timeout_seconds=600,
poll_interval=5,
)
chain(data, get_mean_squared_value)
ray_tutorial()This is a simple DAG comprised of two tasks:
- The
generate_data
task randomly generates a list of 10 integers. - The
get_mean_squared_value
task submits a Ray job on Anyscale to calculate the mean squared value of the list of integers.
-
(Optional). If you are using the traditional syntax with the SubmitRayJob operator, you need to provide the Python code to run in the Ray job as a script. Create a new file in your
dags
directory calledray_script.py
and add the following code:# ray_script.py
import numpy as np
import ray
import argparse
@ray.remote
def square(x):
return x**2
def main(data):
ray.init()
data = np.array(data)
futures = [square.remote(x) for x in data]
results = ray.get(futures)
mean = np.mean(results)
print(f"Mean of this population is {mean}")
return mean
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument('data', nargs='+', type=float, help='List of numbers to process')
args = parser.parse_args()
data = args.data
main(data)
Step 4: Run the DAG
-
In the Airflow UI, click the play button to manually run your DAG.
-
After the DAG runs successfully, check go to your Ray dashboard to see the job submitted by Airflow.
Conclusion
Congratulations! You've run a Ray job using Apache Airflow. You can now use the Ray provider package to orchestrate more complex Ray jobs, see Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale for an example.