Skip to main content

Orchestrate MongoDB operations with Apache Airflow

MongoDB is a general-purpose document database that supports high-dimensional embeddings for text data and complex search queries. With the Mongo DB Airflow provider, you can orchestrate interactions with MongoDB from your Airflow DAGs.

In this tutorial, you'll use Apache Airflow, MongoDB, and OpenAI to create a pipeline to ingest, embed, store, and query video game descriptions in MongoDB.

Why use Airflow with MongoDB?

MongoDB can store and query high-dimensional embeddings for text data, which is useful for applications like recommendation systems. You can orchestrate all steps of the process with Airflow, from ingesting data to querying it.

By integrating MongoDB with Airflow, you can:

  • Use Airflow's data-driven scheduling capabilities to trigger operations in MongoDB based on other events in your data ecosystem, like the training of a new model having completed or the successful ingestion of new text data.
  • Create pipelines interacting with MongoDB that adapt to changes in your data at runtime using dynamic Airflow tasks.
  • Add Airflow features such as retries, branching, and alerts to MongoDB operations for handling complex task dependencies or task failures.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • A MongoDB cluster. Astronomer recommends using MongoDB Atlas, a hosted MongoDB cluster with integrated data services that offers a free trial. See Getting started with MongoDB Atlas.
  • The Astro CLI.
  • An OpenAI API key of at least tier 1. If you do not want to use OpenAI, you will have to adjust the code in the embedding functions to use a different embedding service.

Step 1: Configure your MongoDB Atlas cluster

First you will need to configure your MongoDB Atlas cluster so Airflow can connect to it.

  1. In your MongoDB Atlas account under Security, go to Database Access and create a database user with a password. Make sure the user has privileges to write data to the database and to save the password in a secure location, as you will need it later.

    Screenshot of the MongoDB Atlas UI showing how to add a new user.

  2. If you haven't already done so during your MongoDB setup, go to Security -> Network Access and add your public IP address to the IP access list. You can find your public IP address on Mac and Linux by running curl ifconfig.co/, or on Windows by running ipconfig /all.

Step 2: Configure your Astro project

Use the Astro CLI to create and run an Airflow project locally.

  1. Create a new Astro project:

    $ mkdir astro-mongodb-tutorial && cd astro-mongodb-tutorial
    $ astro dev init
  2. Add the following lines to the requirements.txt file of your Astro project:

    apache-airflow-providers-mongo==4.2.2
    openai==1.52.0

    This installs the Mongo provider package that contains all of the relevant MongoDB modules for Airflow, as well as the OpenAI package for embedding text data.

  3. In the .env file add the following environment variables:

    AIRFLOW_CONN_MONGODB_DEFAULT='{
    "conn_type": "mongo",
    "host": "<your_cluster>.mongodb.net",
    "login": "<your_user>",
    "password": "<your_password>",
    "extra": {
    "srv": "true",
    "ssl": "true"
    }
    }'

    OPENAI_API_KEY="sk-<your_openai_api_key>"

    Replace <your openai api key> with your OpenAI API key and <your_cluster>, <your_user>, and <your_password> with your MongoDB Atlas cluster name, database user, and database user password created in Step 1. You can find your MongoDB Atlas cluster name in the Atlas UI by clicking on Connect in your cluster overview.

info

The AIRFLOW_CONN_MONGODB_DEFAULT environment variable is used to create a connection to your MongoDB cluster in Airflow with the connection ID mongodb_default. In the MongoDB Airflow provider version 4.2.2 and later, it is also possible to set the connection in the Airflow UI. To do so, provide the following connection details:

  • Connection Id: mongodb_default
  • Connection Type: MongoDB
  • Host: <your_cluster>.mongodb.net
  • Username: <your_user>
  • Password: <your_password>
  • SRV Connection: Checked
  • Use SSL: Checked

While leaving all other fields blank.

Step 3: Add your data

The DAG in this tutorial runs a query on vectorized game descriptions.

Create a new file called games.txt in the include directory, then copy and paste the following information:

1 ::: Minecraft (2009) ::: sandbox ::: In a blocky, procedurally-generated world, players explore, gather resources, craft tools, and build structures, with the option to fight off monsters and explore vast environments.
2 ::: The Sims 2 (2004) ::: life simulation ::: Players create and control characters, managing their lives, relationships, and homes in a virtual world, while guiding them through everyday tasks and fulfilling personal ambitions.
3 ::: Call of Duty: Modern Warfare 2 (2009) ::: shooter ::: In this intense military first-person shooter, players join elite military operations across the globe, fighting in a fast-paced war against a dangerous enemy.
4 ::: Halo 3 (2007) ::: sci-fi shooter ::: Master Chief returns to finish the fight against the Covenant and the Flood, battling across futuristic environments in a war to save humanity.
5 ::: Star Wars: Battlefront 2 (2005) ::: action shooter ::: Players fight in large-scale battles across iconic Star Wars locations, engaging in both ground and space combat to claim victory.
6 ::: Age of Mythology (2002) ::: real-time strategy ::: Players command armies of mythological creatures and heroes, leveraging the powers of gods to wage war in an ancient, myth-inspired world.
7 ::: Stronghold (2001) ::: real-time strategy ::: In a medieval world, players build and manage castles, control armies, and lay siege to enemies, balancing economic management with military strategy.
8 ::: Command & Conquer: Tiberium Wars (2007) ::: real-time strategy ::: In a futuristic setting, players lead global military factions battling over the alien resource Tiberium, engaging in fast-paced tactical warfare.
9 ::: Minesweeper (1990) ::: puzzle ::: In this classic puzzle game, players use logic to uncover hidden mines on a grid without triggering any explosions.
10 ::: Addy Junior (2000) ::: educational ::: An educational game designed to help children improve reading, math, and problem-solving skills through engaging, playful activities.
11 ::: Impossible Creatures (2003) ::: real-time strategy ::: Players design and combine creatures using DNA from various animals, creating unique hybrids to lead in battle across an alternate 1930s world.
12 ::: World of Warcraft (2004) ::: MMORPG ::: Players explore a vast fantasy world, completing quests, battling enemies, and forming alliances in an ever-changing landscape of adventure.

Step 4: Create your DAG

In your Astro project dags folder, create a new file called query_game_vectors.py. Paste the following code into the file:

"""
## Tutorial DAG: Load and query video game descriptions with MongoDB and OpenAI
"""

import logging
import os

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pendulum import datetime

t_log = logging.getLogger("airflow.task")

_MONGO_DB_CONN = os.getenv("MONGO_DB_CONN", "mongodb_default")
_MONGO_DB_DATABASE_NAME = os.getenv("MONGO_DB_DATABASE_NAME", "games")
_MONGO_DB_COLLECTION_NAME = os.getenv("MONGO_DB_COLLECTION_NAME", "games_nostalgia")
_MONGO_DB_SEARCH_INDEX_NAME = os.getenv("MONGO_DB_SEARCH_INDEX_NAME", "find_me_a_game")
_MONGO_DB_VECTOR_COLUMN_NAME = os.getenv("MONGO_DB_VECTOR_COLUMN_NAME", "vector")

_OPENAI_EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
_OPENAI_EMBEDDING_MODEL_DIMENSIONS = os.getenv(
"OPENAI_EMBEDDING_MODEL_DIMENSIONS", 1536
)

_DATA_TEXT_FILE_PATH = os.getenv("DATA_TEXT_FILE_PATH", "include/games.txt")

_COLLECTION_EXISTS_TASK_ID = "collection_already_exists"
_CREATE_COLLECTION_TASK_ID = "create_collection"
_CREATE_INDEX_TASK_ID = "create_search_index"
_INDEX_EXISTS_TASK_ID = "search_index_already_exists"


def _get_mongodb_database(
mongo_db_conn_id: str = _MONGO_DB_CONN,
mongo_db_database_name: str = _MONGO_DB_DATABASE_NAME,
):
"""
Get the MongoDB database.
Args:
mongo_db_conn_id (str): The connection ID for the MongoDB connection.
mongo_db_database_name (str): The name of the database.
Returns:
The MongoDB database.
"""
hook = MongoHook(mongo_conn_id=mongo_db_conn_id)
client = hook.get_conn()
return client[mongo_db_database_name]


def _create_openai_embeddings(text: str, model: str):
"""
Create embeddings for a text with the OpenAI API.
Args:
text (str): The text to create embeddings for.
model (str): The OpenAI model to use.
Returns:
The embeddings for the text.
"""
from openai import OpenAI

client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
response = client.embeddings.create(input=text, model=model)
embeddings = response.data[0].embedding

return embeddings


@dag(
start_date=datetime(2024, 10, 1),
schedule=None,
catchup=False,
max_consecutive_failed_dag_runs=5,
tags=["mongodb"],
doc_md=__doc__,
params={
"game_concepts": Param(
["fantasy", "quests"],
type="array",
description=(
"What kind of game do you want to play today?"
+ " Add one concept per line."
),
),
},
)
def query_game_vectors():

@task.branch
def check_for_collection() -> str:
"Check if the provided collection already exists and decide on the next step."
database = _get_mongodb_database()
collection_list = database.list_collection_names()
if _MONGO_DB_COLLECTION_NAME in collection_list:
return _COLLECTION_EXISTS_TASK_ID
else:
return _CREATE_COLLECTION_TASK_ID

@task(task_id=_CREATE_COLLECTION_TASK_ID)
def create_collection():
"Create a new collection in the database."
database = _get_mongodb_database()
database.create_collection(_MONGO_DB_COLLECTION_NAME)

collection_already_exists = EmptyOperator(task_id=_COLLECTION_EXISTS_TASK_ID)
collection_ready = EmptyOperator(
task_id="collection_ready", trigger_rule="none_failed"
)

@task
def extract() -> list:
"""
Extract the games from the text file.
Returns:
list: A list with the games.
"""
import re

with open(_DATA_TEXT_FILE_PATH, "r") as f:
games = f.readlines()

games_list = []

for game in games:

parts = game.split(":::")
title_year = parts[1].strip()
match = re.match(r"(.+) \((\d{4})\)", title_year)

title, year = match.groups()
year = int(year)

genre = parts[2].strip()
description = parts[3].strip()

game_data = {
"title": title,
"year": year,
"genre": genre,
"description": description,
}

games_list.append(game_data)

return games_list

@task(map_index_template="{{ game_str }}")
def transform_create_embeddings(game: dict) -> dict:
"""
Create embeddings for the game description.
Args:
game (dict): A dictionary with the game's data.
Returns:
dict: The game's data with the embeddings.
"""
embeddings = _create_openai_embeddings(
text=game.get("description"), model=_OPENAI_EMBEDDING_MODEL
)
game[_MONGO_DB_VECTOR_COLUMN_NAME] = embeddings

# optional: setting the custom map index
from airflow.operators.python import get_current_context

context = get_current_context()
context["game_str"] = f"{game['title']} ({game['year']}) - {game['genre']}"

return game

@task(trigger_rule="none_failed", map_index_template="{{ game_str }}")
def load_data_to_mongo_db(game_data: dict) -> None:
"""
Load the game data to the MongoDB collection.
Args:
game_data (dict): A dictionary with the game's data.
"""

database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]

filter_query = {
"title": game_data["title"],
"year": game_data["year"],
"genre": game_data["genre"],
}

game_str = f"{game_data['title']} ({game_data['year']}) - {game_data['genre']}"

existing_document = collection.find_one(filter_query)

if existing_document:
if existing_document.get("description") != game_data["description"]:
collection.update_one(
filter_query, {"$set": {"description": game_data["description"]}}
)
t_log.info(f"Updated description for record: {game_str}")
else:
t_log.info(f"Skipped duplicate record: {game_str}")
else:
collection.update_one(
filter_query, {"$setOnInsert": game_data}, upsert=True
)
t_log.info(f"Inserted record: {game_str}")

# optional: setting the custom map index
from airflow.operators.python import get_current_context

context = get_current_context()
context["game_str"] = game_str

@task.branch
def check_for_search_index() -> str:
"Check if the provided index already exists and decide on the next step."
database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]
index_list = collection.list_search_indexes().to_list()
index_name_list = [index.get("name") for index in index_list]
if _MONGO_DB_SEARCH_INDEX_NAME in index_name_list:
return _INDEX_EXISTS_TASK_ID
else:
return _CREATE_INDEX_TASK_ID

@task(task_id=_CREATE_INDEX_TASK_ID)
def create_search_index():
"""
Create a search index model for the MongoDB collection.
"""
from pymongo.operations import SearchIndexModel

database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]

search_index_model = SearchIndexModel(
definition={
"mappings": {
"dynamic": True,
"fields": {
_MONGO_DB_VECTOR_COLUMN_NAME: {
"type": "knnVector",
"dimensions": _OPENAI_EMBEDDING_MODEL_DIMENSIONS,
"similarity": "cosine",
}
},
},
},
name=_MONGO_DB_SEARCH_INDEX_NAME,
)

collection.create_search_index(model=search_index_model)

search_index_already_exists = EmptyOperator(task_id=_INDEX_EXISTS_TASK_ID)

@task.sensor(
poke_interval=10, timeout=3600, mode="poke", trigger_rule="none_failed"
)
def wait_for_full_indexing():
"""
Wait for the search index to be fully built.
"""
from airflow.sensors.base import PokeReturnValue

database = _get_mongodb_database()
collection = database[_MONGO_DB_COLLECTION_NAME]

index_list = collection.list_search_indexes().to_list()
index = next(
(
index
for index in index_list
if index.get("name") == _MONGO_DB_SEARCH_INDEX_NAME
),
None,
)

if index:
status = index.get("status")
if status == "READY":
t_log.info(f"Search index is {status}. Ready to query.")
condition_met = True
elif status == "FAILED":
raise ValueError("Search index failed to build.")
else:
t_log.info(
f"Search index is {status}. Waiting for indexing to complete."
)
condition_met = False
else:
raise ValueError("Search index not found.")

return PokeReturnValue(is_done=condition_met)

@task
def embed_concepts(**context):
"""
Create embeddings for the provided concepts.
"""
from openai import OpenAI

client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
game_concepts = context["params"]["game_concepts"]
game_concepts_str = " ".join(game_concepts)

embeddings = client.embeddings.create(
input=game_concepts_str, model=_OPENAI_EMBEDDING_MODEL
)

return embeddings.to_dict()

@task
def query(query_vector: list):
"""
Query the MongoDB collection for games based on the provided concepts.
"""

db = _get_mongodb_database()
collection = db[_MONGO_DB_COLLECTION_NAME]

results = collection.aggregate(
[
{
"$vectorSearch": {
"exact": True,
"index": _MONGO_DB_SEARCH_INDEX_NAME,
"limit": 1,
"path": _MONGO_DB_VECTOR_COLUMN_NAME,
"queryVector": query_vector["data"][0]["embedding"],
}
}
]
)

results_list = []

for result in results:

game_id = str(result["_id"])
title = result["title"]
year = result["year"]
genre = result["genre"]
description = result["description"]

t_log.info(f"You should play {title}!")
t_log.info(f"It was released in {year} and belongs to the {genre} genre.")
t_log.info(f"Description: {description}")

results_list.append(
{
"game_id": game_id,
"title": title,
"year": year,
"genre": genre,
"description": description,
}
)

return results_list

_extract = extract()
_transform_create_embeddings = transform_create_embeddings.expand(game=_extract)
_load_data_to_mongo_db = load_data_to_mongo_db.expand(
game_data=_transform_create_embeddings
)

_query = query(embed_concepts())

chain(
check_for_collection(),
[create_collection(), collection_already_exists],
collection_ready,
)

chain(
collection_ready,
check_for_search_index(),
[create_search_index(), search_index_already_exists],
wait_for_full_indexing(),
_query,
)

chain(collection_ready, _load_data_to_mongo_db, _query)


query_game_vectors()

This DAG consists of thirteen tasks to make a simple ML orchestration pipeline.

  • First, the check_for_collection task checks if the games_nostalgia collection already exists in the games database. If it does, the collection creation is skipped, if not, the collection is created by the create_collection task.
  • Once the collection is ready, a similar pattern is used to create a search index find_me_a_game if it does not already exist.
  • Simultaneously, the game descriptions are being ingested in an ETL pipeline where the transformation includes the creating of vector embeddings using OpenAI's text-embedding-3-small model. The embeddings are then stored in the games_nostalgia collection alongside the game data.
  • After the search index is ready and the data is ingested, the custom sensor wait_for_full_indexing makes sure the search index is fully built before the query task is triggered.
  • Finally, the query task queries the games_nostalgia collection for the game with the most similar description to the concepts provided in the Airflow params dictionary.

Screenshot of the Airflow UI showing the query_game_vectors DAG graph.

Step 5: Run the DAG and review the data

Now you can run the DAG manually to find a game to play!

  1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080. Log in with admin as the username and password.

  2. In the Airflow UI, run the query_game_vectors DAG by clicking the play button. Then, provide Airflow params for game_concepts.

    Screenshot of the Airflow UI Trigger DAG view showing the concepts fantasy and quests selected as query params.

  3. After the DAG completes successfully, go to the task logs of the query task to see the game with the most similar description to the concepts you provided.

    [2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:343} INFO - You should play World of Warcraft!
    [2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:344} INFO - It was released in 2004 and belongs to the MMORPG genre.
    [2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:345} INFO - Description: Players explore a vast fantasy world, completing quests, battling enemies, and forming alliances in an ever-changing landscape of adventure.

Conclusion

Congratulations! You used Airflow and MongoDB to get a game suggestion! You can now use Airflow to orchestrate MongoDB operations in your own pipelines.

Was this page helpful?