Use Airflow object storage to interact with cloud storage in an ML pipeline
Airflow 2.8 introduced the Airflow object storage feature to simplify how you interact with remote and local object storage systems.
This tutorial demonstrates the object storage feature using a simple machine learning pipeline. The pipeline trains a classifier to predict whether a sentence is more likely to have been said by Star Trek's Captain Kirk or Captain Picard.
Object storage is currently considered experimental and might be subject to breaking changes in future releases. For more information see AIP-58.
Why use Airflow object storage?
Object stores are ubiquitous in modern data pipelines. They are used to store raw data, model-artifacts, image, video, text and audio files, and more. Because each object storage system has different file naming and path conventions, it can be challenging to work with data across many different object stores.
Airflow's object storage feature allow you to:
- Abstract your interactions with object stores using a Path API. Note that some limitations apply due to the nature of different remote object storage systems. See Cloud Object Stores are not real file systems.
- Switch between different object storage systems without having to change your DAG code.
- Transfer files between different object storage systems without needing to use XToYTransferOperator operators.
- Transfer large files efficiently. For object storage, Airflow uses shutil.copyfileobj() to stream files in chunks instead of loading them into memory in their entirety.
Time to complete
This tutorial takes approximately 20 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Task Flow API. See Introduction to the TaskFlow API and Airflow decorators.
- The basics of pathlib.
Prerequisites
- The Astro CLI.
- A object storage system to interact with. This tutorial uses Amazon S3, but you can use Google Cloud Storage, Azure Blob Storage or local file storage as well.
Step 1: Configure your Astro project
-
Create a new Astro project:
$ mkdir astro-object-storage-tutorial && cd astro-object-storage-tutorial
$ astro dev init -
Add the following lines to your Astro project
requirements.txt
file to install the Amazon provider with thes3fs
extra, as well as the scikit-learn package. If you are using Google Cloud Storage or Azure Blob Storage, install the Google provider or Azure provider instead.apache-airflow-providers-amazon[s3fs]==8.13.0
scikit-learn==1.3.2 -
To create an Airflow connection to AWS S3, add the following environment variable to your
.env
file. Make sure to replace<your-aws-access-key-id>
and<your-aws-secret-access-key>
with your own AWS credentials. Adjust the connection type and parameters if you are using a different object storage system.AIRFLOW_CONN_MY_AWS_CONN='{
"conn_type": "aws",
"login": "<your-aws-access-key-id>",
"password": <your-aws-secret-access-key>",
}'
Step 2: Prepare your data
In this example pipeline you will train a classifier to predict whether a sentence is more likely to have been said by Captain Kirk or Captain Picard. The training set consists of 3 quotes from each captain stored in .txt
files.
- Create a new bucket in your S3 account called
astro-object-storage-tutorial
. - In the bucket, create a folder called
ingest
with two subfolderskirk_quotes
andpicard_quotes
. - Upload the files from Astronomer's GitHub repository into the respective folders.
Step 3: Create your DAG
-
In your
dags
folder, create a file calledobject_storage_use_case.py
. -
Copy the following code into the file.
"""
## Move files between object storage system locations in an MLOps pipeline
This DAG shows the basic use of the Airflow 2.8 Object Storage feature to
copy files between object storage system locations in an MLOps pipeline training
a Naive Bayes Classifier to distinguish between quotes from Captain Kirk and
Captain Picard and provide a prediction for a user-supplied quote.
To be able to run this DAG you will need to add the contents of
`include/ingestion_data_object_store_use_case` to your object storage system,
install the relevant provider package for your object storage and define an
Airflow connection to it.
If you do not want to use remote storage you can use `file://` for local object
storage and adjust the paths accordingly.
"""
from airflow.decorators import dag, task
from pendulum import datetime
from airflow.io.path import ObjectStoragePath
from airflow.models.baseoperator import chain
from airflow.models.param import Param
import joblib
import base64
import io
OBJECT_STORAGE_INGEST = "s3"
CONN_ID_INGEST = "my_aws_conn"
PATH_INGEST = "astro-object-storage-tutorial/ingest/"
OBJECT_STORAGE_TRAIN = "s3"
CONN_ID_TRAIN = "my_aws_conn"
PATH_TRAIN = "astro-object-storage-tutorial/train/"
OBJECT_STORAGE_ARCHIVE = "file"
CONN_ID_ARCHIVE = None
PATH_ARCHIVE = "include/archive/"
base_path_ingest = ObjectStoragePath(
f"{OBJECT_STORAGE_INGEST}://{PATH_INGEST}", conn_id=CONN_ID_INGEST
)
base_path_train = ObjectStoragePath(
f"{OBJECT_STORAGE_TRAIN}://{PATH_TRAIN}", conn_id=CONN_ID_TRAIN
)
base_path_archive = ObjectStoragePath(
f"{OBJECT_STORAGE_ARCHIVE}://{PATH_ARCHIVE}", conn_id=CONN_ID_ARCHIVE
)
@dag(
start_date=datetime(2023, 12, 1),
schedule=None,
catchup=False,
tags=["ObjectStorage"],
doc_md=__doc__,
params={
"my_quote": Param(
"Time and space are creations of the human mind.",
type="string",
description="Enter a quote to be classified as Kirk-y or Picard-y.",
)
},
)
def object_storage_use_case():
@task
def list_files_ingest(base: ObjectStoragePath) -> list[ObjectStoragePath]:
"""List files in remote object storage including subdirectories."""
labels = [obj for obj in base.iterdir() if obj.is_dir()]
files = [f for label in labels for f in label.iterdir() if f.is_file()]
return files
@task
def copy_files_ingest_to_train(src: ObjectStoragePath, dst: ObjectStoragePath):
"""Copy a file from one remote system to another.
The file is streamed in chunks using shutil.copyobj"""
src.copy(dst=dst)
@task
def list_files_train(base: ObjectStoragePath) -> list[ObjectStoragePath]:
"""List files in remote object storage."""
files = [f for f in base.iterdir() if f.is_file()]
return files
@task
def get_text_from_file(file: ObjectStoragePath) -> dict:
"""Read files in remote object storage."""
bytes = file.read_block(offset=0, length=None)
text = bytes.decode("utf-8")
key = file.key
filename = key.split("/")[-1]
label = filename.split("_")[-2]
return {"label": label, "text": text}
@task
def train_model(train_data: list[dict]):
"""Train a Naive Bayes Classifier using the files in the train folder."""
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
text_data = [d["text"] for d in train_data]
labels = [d["label"] for d in train_data]
X_train, X_test, y_train, y_test = train_test_split(
text_data, labels, test_size=0.2, random_state=42
)
model = make_pipeline(CountVectorizer(), MultinomialNB())
model.fit(X_train, y_train)
buffer = io.BytesIO()
joblib.dump(model, buffer)
buffer.seek(0)
encoded_model = base64.b64encode(buffer.getvalue()).decode("utf-8")
return encoded_model
@task
def use_model(encoded_model: str, **context):
"""Load the model and use it for prediction."""
my_quote = context["params"]["my_quote"]
model_binary = base64.b64decode(encoded_model)
buffer = io.BytesIO(model_binary)
model = joblib.load(buffer)
predictions = model.predict([my_quote])
print(f"The quote: '{my_quote}'")
print(f"sounds like it could have been said by {predictions[0].capitalize()}")
@task
def copy_files_train_to_archive(src: ObjectStoragePath, dst: ObjectStoragePath):
"""Copy a file from a remote system to local storage."""
src.copy(dst=dst)
@task
def empty_train(base: ObjectStoragePath):
"""Empty the train folder."""
for file in base.iterdir():
file.unlink()
files_ingest = list_files_ingest(base=base_path_ingest)
files_copied = copy_files_ingest_to_train.partial(dst=base_path_train).expand(
src=files_ingest
)
files_train = list_files_train(base=base_path_train)
chain(files_copied, files_train)
train_data = get_text_from_file.expand(file=files_train)
encoded_model = train_model(train_data=train_data)
use_model(encoded_model=encoded_model)
chain(
encoded_model,
copy_files_train_to_archive.partial(dst=base_path_archive).expand(
src=files_train
),
empty_train(base=base_path_train),
)
object_storage_use_case()This DAG uses three different object storage locations, which can be aimed at different object storage systems by changing the
OBJECT_STORAGE_X
,PATH_X
andCONN_ID_X
for each location.base_path_ingest
: The base path for the ingestion data. This is the path to the training quotes you uploaded in Step 2.base_path_train
: The base path for the training data, this is the location from which data for training the model will be read.base_path_archive
: The base path for the archive location where data that has previously been used for training will be moved to.
The DAG consists of eight tasks to make a simple MLOps pipeline.
- The
list_files_ingest
task takes thebase_path_ingest
as an input and iterates through the subfolderskirk_quotes
andpicard_quotes
to return all files in the folders as individualObjectStoragePath
objects. Using the object storage feature enables you to use the.iterdir()
,.is_dir()
and.is_file()
methods to list and evaluate object storage contents no matter which object storage system they are stored in. - The
copy_files_ingest_to_train
task is dynamically mapped over the list of files returned by thelist_files_ingest
task. It takes thebase_path_train
as an input and copies the files from thebase_path_ingest
to thebase_path_train
location, providing an example of transferring files between different object storage systems using the.copy()
method of theObjectStoragePath
object. Under the hood, this method usesshutil.copyfileobj()
to stream files in chunks instead of loading them into memory in their entirety. - The
list_files_train
task lists all files in thebase_path_train
location. - The
get_text_from_file
task is dynamically mapped over the list of files returned by thelist_files_train
task to read the text from each file using the.read_blocks()
method of theObjectStoragePath
object. Using the object storage feature enables you to switch the object storage system, for example to Azure Blob storage, without needing to change the code. The file name provides the label for the text and both, label and full quote are returned as a dictionary to be passed via XCom to the next task. - The
train_model
task trains a Naive Bayes classifier on the data returned by theget_text_from_file
task. The fitted model is serialized as a base64 encoded string and passed via XCom to the next task. - The
use_model
task deserializes the trained model to run a prediction on a user-provided quote, determining whether the quote is more likely to have been said by Captain Kirk or Captain Picard. The prediction is printed to the logs. - The
copy_files_train_to_archive
task copies the files from thebase_path_train
to thebase_path_archive
location analogous to thecopy_files_ingest_to_train
task. - The
empty_train
task deletes all files from thebase_path_train
location.
Step 4: Run your DAG
-
Run
astro dev start
in your Astro project to start Airflow, then open the Airflow UI atlocalhost:8080
. -
In the Airflow UI, run the
object_storage_use_case
DAG by clicking the play button. Provide any quote you like to themy_quote
Airflow param. -
After the DAG run completes, go to the task logs of the
use_model
task to see the prediction made by the model.[2023-12-11, 00:19:22 UTC] {logging_mixin.py:188} INFO - The quote: 'Time and space are creations of the human mind.'
[2023-12-11, 00:19:22 UTC] {logging_mixin.py:188} INFO - sounds like it could have been said by Picard
Conclusion
Congratulations! You just used Airflow's object storage feature to interact with files in different locations. To learn more about other methods and capabilities of this feature, see the OSS Airflow documentation.