Use MLflow with Apache Airflow
MLflow is a popular tool for tracking and managing machine learning models. It can be used together with Airflow for ML orchestration (MLOx), leveraging both tools for what they do best. In this tutorial, you’ll learn about three different ways you can use MLflow with Airflow.
There are multiple resources for learning about this topic. See also:
- Use case: Predict possum tail length using MLflow, Airflow, and linear regression including a ready-to-use example repository.
Three ways to use MLflow with Airflow
The DAG in this tutorial shows three different ways Airflow can interact with MLflow:
- Use an MLflow operator from the MLflow Airflow provider. The MLflow provider contains several operators that abstract over common actions you might want to perform in MLflow, such as creating a deployment with the CreateDeploymentOperator or running predictions from an existing model with the ModelLoadAndPredictOperator.
- Use an MLflow hook from the MLflow Airflow provider. The MLflow provider contains several Airflow hooks that allow you to connect to MLflow using credentials stored in an Airflow connection. You can use these hooks if you need to perform actions in MLflow for which no dedicated operator exists. You can also use these hooks to create your own custom operators.
- Use the MLflow Python package directly in a @task decorated task. The MLflow Python package contains functionality like tracking metrics and artifacts with
mlflow.sklearn.autolog
. You can use this package to write custom Airflow tasks for ML-related actions like feature engineering.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of MLflow. See MLflow Concepts.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow hooks. See Hooks 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
Prerequisites
- The Astro CLI.
- An MLflow instance. This tutorial uses a local instance.
- An object storage connected to your MLflow instance. This tutorial uses MinIO.
Step 1: Configure your Astro project
-
Create a new Astro project:
$ mkdir astro-mlflow-tutorial && cd astro-mlflow-tutorial
$ astro dev init -
Add the following packages to your
packages.txt
file:git
gcc
gcc python3-dev -
Add the following packages to your
requirements.txt
file:airflow-provider-mlflow==1.1.0
mlflow-skinny==2.3.2
Step 2: Configure your Airflow connection
To connect Airflow to your MLflow instance, you need to create a connection in Airflow.
-
Run
astro dev start
in your Astro project to start up Airflow and open the Airflow UI atlocalhost:8080
. -
In the Airflow UI, go to Admin -> Connections and click +.
-
Create a new connection named
mlflow_default
and choose theHTTP
connection type. Enter the following values to create a connection to a local MLflow instance:- Connection ID:
mlflow_default
- Connection Type:
HTTP
- Host:
http://host.docker.internal
- Port:
5000
- Connection ID:
If you are using a remote MLflow instance, enter your MLflow instance URL as the Host and your username and password as the Login and Password in the connection. If you are running your MLflow instance via Databricks, enter your Databricks URL as the Host, enter token
as the Login and your Databricks personal access token as the Password.
When you test the connection from the Airflow UI, please note that the Test button might return a 405 error message even if your credentials are correct.
Step 3: Create your DAG
-
In your
dags
folder, create a file calledmlflow_tutorial_dag.py
. -
Copy the following code into the file. Make sure to provide the name of a bucket in your object storage that is connected to your MLflow instance to the
ARTIFACT_BUCKET
variable."""
### Show three ways to use MLFlow with Airflow
This DAG shows how you can use the MLflowClientHook to create an experiment in MLFlow,
directly log metrics and parameters to MLFlow in a TaskFlow task via the mlflow Python package, and
create a new model using the CreateRegisteredModelOperator of the MLflow Airflow provider package.
"""
from airflow.decorators import dag, task
from pendulum import datetime
from astro.dataframes.pandas import DataFrame
from mlflow_provider.hooks.client import MLflowClientHook
from mlflow_provider.operators.registry import CreateRegisteredModelOperator
# Adjust these parameters
EXPERIMENT_ID = 1
ARTIFACT_BUCKET = "<your-bucket-name>"
## MLFlow parameters
MLFLOW_CONN_ID = "mlflow_default"
EXPERIMENT_NAME = "Housing"
REGISTERED_MODEL_NAME = "my_model"
@dag(
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
)
def mlflow_tutorial_dag():
# 1. Use a hook from the MLFlow provider to interact with MLFlow within a TaskFlow task
@task
def create_experiment(experiment_name, artifact_bucket, **context):
"""Create a new MLFlow experiment with a specified name.
Save artifacts to the specified S3 bucket."""
ts = context["ts"]
mlflow_hook = MLflowClientHook(mlflow_conn_id=MLFLOW_CONN_ID)
new_experiment_information = mlflow_hook.run(
endpoint="api/2.0/mlflow/experiments/create",
request_params={
"name": ts + "_" + experiment_name,
"artifact_location": f"s3://{artifact_bucket}/",
},
).json()
return new_experiment_information
# 2. Use mlflow.sklearn autologging in a TaskFlow task
@task
def scale_features(experiment_id: str):
"""Track feature scaling by sklearn in Mlflow."""
from sklearn.datasets import fetch_california_housing
from sklearn.preprocessing import StandardScaler
import mlflow
import pandas as pd
df = fetch_california_housing(download_if_missing=True, as_frame=True).frame
mlflow.sklearn.autolog()
target = "MedHouseVal"
X = df.drop(target, axis=1)
y = df[target]
scaler = StandardScaler()
with mlflow.start_run(experiment_id=experiment_id, run_name="Scaler") as run:
X = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)
mlflow.sklearn.log_model(scaler, artifact_path="scaler")
mlflow.log_metrics(pd.DataFrame(scaler.mean_, index=X.columns)[0].to_dict())
X[target] = y
# 3. Use an operator from the MLFlow provider to interact with MLFlow directly
create_registered_model = CreateRegisteredModelOperator(
task_id="create_registered_model",
name="{{ ts }}" + "_" + REGISTERED_MODEL_NAME,
tags=[
{"key": "model_type", "value": "regression"},
{"key": "data", "value": "housing"},
],
)
(
create_experiment(
experiment_name=EXPERIMENT_NAME, artifact_bucket=ARTIFACT_BUCKET
)
>> scale_features(experiment_id=EXPERIMENT_ID)
>> create_registered_model
)
mlflow_tutorial_dag()This DAG consists of three tasks, each showing a different way to use MLflow with Airflow.
- The
create_experiment
task creates a new experiment in MLflow by using the MLflowClientHook in a TaskFlow API task. The MLflowClientHook is one of several hooks in the MLflow provider that contains abstractions over calls to the MLflow API. - The
scale_features
task uses the mlflow package in a Python decorated task with scikit-learn to log information about the scaler to MLflow. This functionality is not included in any modules of the MLflow provider, so a custom Python function is the best way to implement this task. - The
create_registered_model
task uses the CreateRegisteredModelOperator to register a new model in your MLflow instance.
- The
Step 4: Run your DAG
-
In the Airflow UI run the
mlflow_tutorial_dag
DAG by clicking the play button. -
Open the MLflow UI (if you are running locally at
localhost:5000
) to see the data recorded by each task in your DAG.The
create_experiment
task created theHousing
experiments, where yourScaler
run from thescale_features
task was recorded.The
create_registered_model
task created a registered model with two tags. -
Open your object storage (if you are using a local MinIO instance at
localhost:9001
) to see your MLflow artifacts.
Conclusion
Congratulations! You used MLflow and Airflow together in three different ways. Learn more about other operators and hooks in the MLflow Airflow provider in the official GitHub repository.