Orchestrate Ray jobs on Anyscale with Apache Airflow®
Anyscale is a compute platform for AI/ML workloads built on the open-source Ray framework, providing the layer for parallel processing and distributed computing. The Anyscale provider package for Apache Airflow® allows you to interact with Anyscale from your Airflow DAGs. This tutorial shows a simple example of how to use the Anyscale provider package to orchestrate Ray jobs on Anyscale with Airflow. For more in-depth information, see the Anyscale provider documentation.
For instructions on how to run open-source Ray jobs with Airflow, see the Orchestrate Ray jobs with Apache Airflow® tutorial.
This tutorial shows a simple implementation of the Anyscale provider package. For a more complex example, see the Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale reference architecture.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- Ray basics. See the Getting Started section of the Ray documentation.
- Anyscale basics. See Get started section of the Anyscale documentation.
- Airflow operators. See Airflow operators.
Prerequisites
- The Astro CLI.
- An Anyscale account with AI platform features enabled. You also need to have at least one suitable image and compute config available in your Anyscale account.
Step 1: Configure your Astro project
Use the Astro CLI to create and run an Airflow project on your local machine.
-
Create a new Astro project:
$ mkdir astro-anyscale-tutorial && cd astro-anyscale-tutorial
$ astro dev init -
In the requirements.txt file, add the Anyscale provider.
astro-provider-anyscale==1.0.1
-
Run the following command to start your Airflow project:
astro dev start
Step 2: Configure a Ray connection
For Astro customers, Astronomer recommends to take advantage of the Astro Environment Manager to store connections in an Astro-managed secrets backend. These connections can be shared across multiple deployed and local Airflow environments. See Manage Astro connections in branch-based deploy workflows.
-
In the Airflow UI, go to Admin -> Connections and click +.
-
Create a new connection and choose the
Anyscale
connection type. Enter the following information:- Connection ID:
anyscale_conn
- API Key: Your Anyscale API key
- Connection ID:
-
Click Save.
Step 3: Write a DAG to orchestrate Anyscale jobs
-
Create a new file in your
dags
directory calledanyscale_script.py
and add the following code:# anyscale_script.py
import numpy as np
import ray
import argparse
@ray.remote
def square(x):
return x**2
def main(data):
ray.init()
data = np.array(data)
futures = [square.remote(x) for x in data]
results = ray.get(futures)
mean = np.mean(results)
print(f"Mean squared value: {mean}")
return mean
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument(
"data", nargs="+", type=float, help="List of numbers to process"
)
args = parser.parse_args()
data = args.data
main(data) -
Create a new file in your
dags
directory calledanyscale_tutorial.py
. -
Copy and paste the code below into the file:
"""
## Anyscale Tutorial
This tutorial demonstrates how to use the Anyscale provider in Airflow to
parallelize a task using Ray on Anyscale.
"""
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
from airflow.models.baseoperator import chain
from pathlib import Path
CONN_ID = "anyscale_conn"
FOLDER_PATH = Path(__file__).parent
def _generate_data() -> list:
"""
Generate sample data
Returns:
list: List of integers
"""
import random
return [random.randint(1, 100) for _ in range(10)]
@dag(
start_date=None,
schedule=None,
catchup=False,
tags=["ray", "example"],
doc_md=__doc__,
)
def anyscale_tutorial():
data = PythonOperator(
task_id="generate_data",
python_callable=_generate_data,
)
get_mean_squared_value = SubmitAnyscaleJob(
task_id="SubmitRayJob",
conn_id=CONN_ID,
name="AstroJob",
image_uri="< your image uri >", # e.g. "anyscale/ray:2.35.0-slim-py312-cpu"
compute_config="< your compute config >", # e.g. airflow-integration-testing:1
entrypoint="python anyscale_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}",
working_dir=str(FOLDER_PATH), # the folder containing the script
requirements=["requests", "pandas", "numpy", "torch"],
max_retries=1,
job_timeout_seconds=3000,
poll_interval=30,
)
chain(data, get_mean_squared_value)
anyscale_tutorial()- The
generate_data
task randomly generates a list of 10 integers. - The
get_mean_squared_value
task submits a Ray job on Anyscale to calculate the mean squared value of the list of integers.
- The
Step 4: Run the DAG
-
In the Airflow UI, click the play button to manually run your DAG.
-
After the DAG runs successfully, check your Anyscale account to see the job submitted by Airflow.
Conclusion
Congratulations! You've run a Ray job on Anyscale using Apache Airflow. You can now use the Anyscale provider package to orchestrate more complex jobs, see Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale for an example.