Orchestrate MongoDB operations with Apache Airflow
MongoDB is a general-purpose document database that supports high-dimensional embeddings for text data and complex search queries. With the Mongo DB Airflow provider, you can orchestrate interactions with MongoDB from your Airflow DAGs.
In this tutorial, you'll use Apache Airflow, MongoDB, and OpenAI to create a pipeline to ingest, embed, store, and query video game descriptions in MongoDB.
Why use Airflow with MongoDB?
MongoDB can store and query high-dimensional embeddings for text data, which is useful for applications like recommendation systems. You can orchestrate all steps of the process with Airflow, from ingesting data to querying it.
By integrating MongoDB with Airflow, you can:
- Use Airflow's data-driven scheduling capabilities to trigger operations in MongoDB based on other events in your data ecosystem, like the training of a new model having completed or the successful ingestion of new text data.
- Create pipelines interacting with MongoDB that adapt to changes in your data at runtime using dynamic Airflow tasks.
- Add Airflow features such as retries, branching, and alerts to MongoDB operations for handling complex task dependencies or task failures.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of MongoDB. See Getting started.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow decorators. See Introduction to the TaskFlow API and Airflow decorators.
- Airflow connections. See Managing your connections in Apache Airflow.
Prerequisites
- A MongoDB cluster. Astronomer recommends using MongoDB Atlas, a hosted MongoDB cluster with integrated data services that offers a free trial. See Getting started with MongoDB Atlas.
- The Astro CLI.
- An OpenAI API key of at least tier 1. If you do not want to use OpenAI, you will have to adjust the code in the embedding functions to use a different embedding service.
Step 1: Configure your MongoDB Atlas cluster
First you will need to configure your MongoDB Atlas cluster so Airflow can connect to it.
-
In your MongoDB Atlas account under Security, go to Database Access and create a database user with a password. Make sure the user has privileges to write data to the database and to save the password in a secure location, as you will need it later.
-
If you haven't already done so during your MongoDB setup, go to Security -> Network Access and add your public IP address to the IP access list. You can find your public IP address on Mac and Linux by running
curl ifconfig.co/
, or on Windows by runningipconfig /all
.
Step 2: Configure your Astro project
Use the Astro CLI to create and run an Airflow project locally.
-
Create a new Astro project:
$ mkdir astro-mongodb-tutorial && cd astro-mongodb-tutorial
$ astro dev init -
Add the following lines to the
requirements.txt
file of your Astro project:apache-airflow-providers-mongo==4.2.2
openai==1.52.0This installs the Mongo provider package that contains all of the relevant MongoDB modules for Airflow, as well as the OpenAI package for embedding text data.
-
In the
.env
file add the following environment variables:AIRFLOW_CONN_MONGODB_DEFAULT='{
"conn_type": "mongo",
"host": "<your_cluster>.mongodb.net",
"login": "<your_user>",
"password": "<your_password>",
"extra": {
"srv": "true",
"ssl": "true"
}
}'
OPENAI_API_KEY="sk-<your_openai_api_key>"Replace
<your openai api key>
with your OpenAI API key and<your_cluster>
,<your_user>
, and<your_password>
with your MongoDB Atlas cluster name, database user, and database user password created in Step 1. You can find your MongoDB Atlas cluster name in the Atlas UI by clicking on Connect in your cluster overview.
The AIRFLOW_CONN_MONGODB_DEFAULT
environment variable is used to create a connection to your MongoDB cluster in Airflow with the connection ID mongodb_default
. In the MongoDB Airflow provider version 4.2.2 and later, it is also possible to set the connection in the Airflow UI. To do so, provide the following connection details:
- Connection Id:
mongodb_default
- Connection Type:
MongoDB
- Host:
<your_cluster>.mongodb.net
- Username:
<your_user>
- Password:
<your_password>
- SRV Connection: Checked
- Use SSL: Checked
While leaving all other fields blank.
Step 3: Add your data
The DAG in this tutorial runs a query on vectorized game descriptions.
Create a new file called games.txt
in the include
directory, then copy and paste the following information:
1 ::: Minecraft (2009) ::: sandbox ::: In a blocky, procedurally-generated world, players explore, gather resources, craft tools, and build structures, with the option to fight off monsters and explore vast environments.
2 ::: The Sims 2 (2004) ::: life simulation ::: Players create and control characters, managing their lives, relationships, and homes in a virtual world, while guiding them through everyday tasks and fulfilling personal ambitions.
3 ::: Call of Duty: Modern Warfare 2 (2009) ::: shooter ::: In this intense military first-person shooter, players join elite military operations across the globe, fighting in a fast-paced war against a dangerous enemy.
4 ::: Halo 3 (2007) ::: sci-fi shooter ::: Master Chief returns to finish the fight against the Covenant and the Flood, battling across futuristic environments in a war to save humanity.
5 ::: Star Wars: Battlefront 2 (2005) ::: action shooter ::: Players fight in large-scale battles across iconic Star Wars locations, engaging in both ground and space combat to claim victory.
6 ::: Age of Mythology (2002) ::: real-time strategy ::: Players command armies of mythological creatures and heroes, leveraging the powers of gods to wage war in an ancient, myth-inspired world.
7 ::: Stronghold (2001) ::: real-time strategy ::: In a medieval world, players build and manage castles, control armies, and lay siege to enemies, balancing economic management with military strategy.
8 ::: Command & Conquer: Tiberium Wars (2007) ::: real-time strategy ::: In a futuristic setting, players lead global military factions battling over the alien resource Tiberium, engaging in fast-paced tactical warfare.
9 ::: Minesweeper (1990) ::: puzzle ::: In this classic puzzle game, players use logic to uncover hidden mines on a grid without triggering any explosions.
10 ::: Addy Junior (2000) ::: educational ::: An educational game designed to help children improve reading, math, and problem-solving skills through engaging, playful activities.
11 ::: Impossible Creatures (2003) ::: real-time strategy ::: Players design and combine creatures using DNA from various animals, creating unique hybrids to lead in battle across an alternate 1930s world.
12 ::: World of Warcraft (2004) ::: MMORPG ::: Players explore a vast fantasy world, completing quests, battling enemies, and forming alliances in an ever-changing landscape of adventure.
Step 4: Create your DAG
In your Astro project dags
folder, create a new file called query_game_vectors.py
. Paste the following code into the file:
"""
## Tutorial DAG: Load and query video game descriptions with MongoDB and OpenAI
"""
import logging
import os
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pendulum import datetime
t_log = logging.getLogger("airflow.task")
_MONGO_DB_CONN = os.getenv("MONGO_DB_CONN", "mongodb_default")
_MONGO_DB_DATABASE_NAME = os.getenv("MONGO_DB_DATABASE_NAME", "games")
_MONGO_DB_COLLECTION_NAME = os.getenv("MONGO_DB_COLLECTION_NAME", "games_nostalgia")
_MONGO_DB_SEARCH_INDEX_NAME = os.getenv("MONGO_DB_SEARCH_INDEX_NAME", "find_me_a_game")
_MONGO_DB_VECTOR_COLUMN_NAME = os.getenv("MONGO_DB_VECTOR_COLUMN_NAME", "vector")
_OPENAI_EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
_OPENAI_EMBEDDING_MODEL_DIMENSIONS = os.getenv(
"OPENAI_EMBEDDING_MODEL_DIMENSIONS", 1536
)
_DATA_TEXT_FILE_PATH = os.getenv("DATA_TEXT_FILE_PATH", "include/games.txt")
_COLLECTION_EXISTS_TASK_ID = "collection_already_exists"
_CREATE_COLLECTION_TASK_ID = "create_collection"
_CREATE_INDEX_TASK_ID = "create_search_index"
_INDEX_EXISTS_TASK_ID = "search_index_already_exists"
def _get_mongodb_database(
mongo_db_conn_id: str = _MONGO_DB_CONN,
mongo_db_database_name: str = _MONGO_DB_DATABASE_NAME,
):
"""
Get the MongoDB database.
Args:
mongo_db_conn_id (str): The connection ID for the MongoDB connection.
mongo_db_database_name (str): The name of the database.
Returns:
The MongoDB database.
"""
hook = MongoHook(mongo_conn_id=mongo_db_conn_id)
client = hook.get_conn()
return client[mongo_db_database_name]
def _create_openai_embeddings(text: str, model: str):
"""
Create embeddings for a text with the OpenAI API.
Args:
text (str): The text to create embeddings for.
model (str): The OpenAI model to use.
Returns:
The embeddings for the text.
"""
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
response = client.embeddings.create(input=text, model=model)
embeddings = response.data[0].embedding
return embeddings
@dag(
start_date=datetime(2024, 10, 1),
schedule=None,
catchup=False,
max_consecutive_failed_dag_runs=5,
tags=["mongodb"],
doc_md=__doc__,
params={
"game_concepts": Param(
["fantasy", "quests"],
type="array",
description=(
"What kind of game do you want to play today?"
+ " Add one concept per line."
),
),
},
)
def query_game_vectors():
@task.branch
def check_for_collection() -> str:
"Check if the provided collection already exists and decide on the next step."
database = _get_mongodb_database()
collection_list = database.list_collection_names()
if _MONGO_DB_COLLECTION_NAME in collection_list:
return _COLLECTION_EXISTS_TASK_ID
else:
return _CREATE_COLLECTION_TASK_ID
@task(task_id=_CREATE_COLLECTION_TASK_ID)
def create_collection():
"Create a new collection in the database."
database = _get_mongodb_database()
database.create_collection(_MONGO_DB_COLLECTION_NAME)
collection_already_exists = EmptyOperator(task_id=_COLLECTION_EXISTS_TASK_ID)
collection_ready = EmptyOperator(
task_id="collection_ready", trigger_rule="none_failed"
)
@task
def extract() -> list:
"""
Extract the games from the text file.
Returns:
list: A list with the games.
"""
import re
with open(_DATA_TEXT_FILE_PATH, "r") as f:
games = f.readlines()
games_list = []
for game in games:
parts = game.split(":::")
title_year = parts[1].strip()
match = re.match(r"(.+) \((\d{4})\)", title_year)
title, year = match.groups()
year = int(year)
genre = parts[2].strip()
description = parts[3].strip()
game_data = {
"title": title,
"year": year,
"genre": genre,
"description": description,
}
games_list.append(game_data)
return games_list
@task(map_index_template="{{ game_str }}")
def transform_create_embeddings(game: dict) -> dict:
"""
Create embeddings for the game description.
Args:
game (dict): A dictionary with the game's data.
Returns:
dict: The game's data with the embeddings.
"""
embeddings = _create_openai_embeddings(
text=game.get("description"), model=_OPENAI_EMBEDDING_MODEL
)
game[_MONGO_DB_VECTOR_COLUMN_NAME] = embeddings
# optional: setting the custom map index
from airflow.operators.python import get_current_context
context = get_current_context()
context["game_str"] = f"{game['title']} ({game['year']}) - {game['genre']}"
return game
@task(trigger_rule="none_failed", map_index_template="{{ game_str }}")
def load_data_to_mongo_db(game_data: dict) -> None:
"""
Load the game data to the MongoDB collection.
Args:
game_data (dict): A dictionary with the game's data.
"""
database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]
filter_query = {
"title": game_data["title"],
"year": game_data["year"],
"genre": game_data["genre"],
}
game_str = f"{game_data['title']} ({game_data['year']}) - {game_data['genre']}"
existing_document = collection.find_one(filter_query)
if existing_document:
if existing_document.get("description") != game_data["description"]:
collection.update_one(
filter_query, {"$set": {"description": game_data["description"]}}
)
t_log.info(f"Updated description for record: {game_str}")
else:
t_log.info(f"Skipped duplicate record: {game_str}")
else:
collection.update_one(
filter_query, {"$setOnInsert": game_data}, upsert=True
)
t_log.info(f"Inserted record: {game_str}")
# optional: setting the custom map index
from airflow.operators.python import get_current_context
context = get_current_context()
context["game_str"] = game_str
@task.branch
def check_for_search_index() -> str:
"Check if the provided index already exists and decide on the next step."
database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]
index_list = collection.list_search_indexes().to_list()
index_name_list = [index.get("name") for index in index_list]
if _MONGO_DB_SEARCH_INDEX_NAME in index_name_list:
return _INDEX_EXISTS_TASK_ID
else:
return _CREATE_INDEX_TASK_ID
@task(task_id=_CREATE_INDEX_TASK_ID)
def create_search_index():
"""
Create a search index model for the MongoDB collection.
"""
from pymongo.operations import SearchIndexModel
database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]
search_index_model = SearchIndexModel(
definition={
"mappings": {
"dynamic": True,
"fields": {
_MONGO_DB_VECTOR_COLUMN_NAME: {
"type": "knnVector",
"dimensions": _OPENAI_EMBEDDING_MODEL_DIMENSIONS,
"similarity": "cosine",
}
},
},
},
name=_MONGO_DB_SEARCH_INDEX_NAME,
)
collection.create_search_index(model=search_index_model)
search_index_already_exists = EmptyOperator(task_id=_INDEX_EXISTS_TASK_ID)
@task.sensor(
poke_interval=10, timeout=3600, mode="poke", trigger_rule="none_failed"
)
def wait_for_full_indexing():
"""
Wait for the search index to be fully built.
"""
from airflow.sensors.base import PokeReturnValue
database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]
index_list = collection.list_search_indexes().to_list()
index = next(
(
index
for index in index_list
if index.get("name") == _MONGO_DB_SEARCH_INDEX_NAME
),
None,
)
if index:
status = index.get("status")
if status == "READY":
t_log.info(f"Search index is {status}. Ready to query.")
condition_met = True
elif status == "FAILED":
raise ValueError("Search index failed to build.")
else:
t_log.info(
f"Search index is {status}. Waiting for indexing to complete."
)
condition_met = False
else:
raise ValueError("Search index not found.")
return PokeReturnValue(is_done=condition_met)
@task
def embed_concepts(**context):
"""
Create embeddings for the provided concepts.
"""
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
game_concepts = context["params"]["game_concepts"]
game_concepts_str = " ".join(game_concepts)
embeddings = client.embeddings.create(
input=game_concepts_str, model=_OPENAI_EMBEDDING_MODEL
)
return embeddings.to_dict()
@task
def query(query_vector: list):
"""
Query the MongoDB collection for games based on the provided concepts.
"""
db = _get_mongodb_database()
collection = db[_MONGO_DB_COLLECTION_NAME]
results = collection.aggregate(
[
{
"$vectorSearch": {
"exact": True,
"index": _MONGO_DB_SEARCH_INDEX_NAME,
"limit": 1,
"path": _MONGO_DB_VECTOR_COLUMN_NAME,
"queryVector": query_vector["data"][0]["embedding"],
}
}
]
)
results_list = []
for result in results:
game_id = str(result["_id"])
title = result["title"]
year = result["year"]
genre = result["genre"]
description = result["description"]
t_log.info(f"You should play {title}!")
t_log.info(f"It was released in {year} and belongs to the {genre} genre.")
t_log.info(f"Description: {description}")
results_list.append(
{
"game_id": game_id,
"title": title,
"year": year,
"genre": genre,
"description": description,
}
)
return results_list
_extract = extract()
_transform_create_embeddings = transform_create_embeddings.expand(game=_extract)
_load_data_to_mongo_db = load_data_to_mongo_db.expand(
game_data=_transform_create_embeddings
)
_query = query(embed_concepts())
chain(
check_for_collection(),
[create_collection(), collection_already_exists],
collection_ready,
)
chain(
collection_ready,
check_for_search_index(),
[create_search_index(), search_index_already_exists],
wait_for_full_indexing(),
_query,
)
chain(collection_ready, _load_data_to_mongo_db, _query)
query_game_vectors()
This DAG consists of thirteen tasks to make a simple ML orchestration pipeline.
- First, the
check_for_collection
task checks if thegames_nostalgia
collection already exists in thegames
database. If it does, the collection creation is skipped, if not, the collection is created by thecreate_collection
task. - Once the collection is ready, a similar pattern is used to create a search index
find_me_a_game
if it does not already exist. - Simultaneously, the game descriptions are being ingested in an ETL pipeline where the transformation includes the creating of vector embeddings using OpenAI's
text-embedding-3-small
model. The embeddings are then stored in thegames_nostalgia
collection alongside the game data. - After the search index is ready and the data is ingested, the custom sensor
wait_for_full_indexing
makes sure the search index is fully built before thequery
task is triggered. - Finally, the
query
task queries thegames_nostalgia
collection for the game with the most similar description to the concepts provided in the Airflow params dictionary.
Step 5: Run the DAG and review the data
Now you can run the DAG manually to find a game to play!
-
Run
astro dev start
in your Astro project to start Airflow and open the Airflow UI atlocalhost:8080
. Log in withadmin
as the username and password. -
In the Airflow UI, run the
query_game_vectors
DAG by clicking the play button. Then, provide Airflow params forgame_concepts
. -
After the DAG completes successfully, go to the task logs of the
query
task to see the game with the most similar description to the concepts you provided.[2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:343} INFO - You should play World of Warcraft!
[2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:344} INFO - It was released in 2004 and belongs to the MMORPG genre.
[2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:345} INFO - Description: Players explore a vast fantasy world, completing quests, battling enemies, and forming alliances in an ever-changing landscape of adventure.
Conclusion
Congratulations! You used Airflow and MongoDB to get a game suggestion! You can now use Airflow to orchestrate MongoDB operations in your own pipelines.