Deploy DAGs from Google Cloud Storage to Astro
Prerequisites
- A Google cloud storage (GCS) bucket.
- An Astro Deployment with DAG-only deploys enabled.
- A Deployment API token, Workspace API token, or Organization API token.
- An Astro project containing your project configurations.
DAG deploy template
This CI/CD template can be used to deploy DAGs from a single GCS bucket to a single Astro Deployment. When you create or modify a DAG in the GCS bucket, a Cloud function triggers and initializes an astro
project to deploy your DAGs using Astro CLI.
To deploy any non-DAG code changes to Astro, you need to trigger a standard image deploy with your Astro project. When you do this, your Astro project must include the latest version of your DAGs from your GCS bucket. If your Astro project dags
folder isn't up to date with your GCS DAGs bucket when you trigger this deploy, you will revert your DAGs back to the version hosted in your Astro project.
-
Download the latest Astro CLI binary from GitHub releases, then rename the file to,
astro_cli.tar.gz
. For example, to use Astro CLI version 1.13.0 in your template, downloadastro_1.13.0_linux_amd64.tar.gz
and rename it toastro_cli.tar.gz
. -
In your GCS bucket, create the following new folders:
dags
cli_binary
-
Add
astro_cli.tar.gz
tocli_binary
. -
Create a 1st gen Cloud Function with the Python 3.9 Runtime in the same region as your storage bucket.
-
Create a Cloud Storage trigger with the following configuration:
- Event provider: Select Cloud Storage.
- Event: Select On finalizing/creating file in the selected bucket.
- Bucket: Select your storage bucket.
-
Choose the Runtime Service Account. Ensure that the service account has
storage.objects.list
access to the Google Cloud Storage bucket. -
Set the following environment variables for your Cloud Function:
ASTRO_HOME
=\tmp
ASTRO_API_TOKEN
: The value for your Workspace or Organization API token.ASTRO_DEPLOYMENT_ID
: Your Deployment ID.
For production Deployments, ensure that you store the value for your API token in a secrets backend. See Secret Manager overview.
-
Add the dependency
google-cloud-storage
to therequirements.txt
file for your Cloud Function. See Specifying Dependencies in Python. -
Add the following code to
main.py
:import os
import tarfile
import subprocess
from pathlib import Path
from google.cloud import storage
BUCKET = os.environ.get("BUCKET", "my-demo-bucket")
deploymentId = os.environ.get("ASTRO_DEPLOYMENT_ID", "missing-deployment-id")
def untar(filename: str, destination: str) -> None:
with tarfile.open(filename) as file:
file.extractall(destination)
def run_command(cmd: str) -> None:
p = subprocess.Popen("set -x; " + cmd, shell=True)
p.communicate()
def download_to_local(bucket_name: str, gcs_folder: str, local_dir: str = None) -> None:
"""Download the contents of a folder directory
:param bucket_name: the name of the gcs bucket
:param gcs_folder: the folder path in the gcs bucket
:param local_dir: a relative or absolute directory path in the local file system
"""
## create a storage client to access GCS objects
storage_client = storage.Client()
source_bucket = storage_client.bucket(bucket_name)
## get a list of all the files in the bucket folder
blobs = source_bucket.list_blobs(prefix=gcs_folder)
## download each of the dag to local
for blob in blobs:
if blob.name.endswith('/'):
continue
target = blob.name if local_dir is None \
else os.path.join(local_dir, os.path.relpath(blob.name, gcs_folder))
print(target)
if not os.path.exists(os.path.dirname(target)):
os.makedirs(os.path.dirname(target))
blob.download_to_filename(target)
print("downloaded file")
def astro_deploy(event, context) -> None:
"""Triggered by a change to a Cloud Storage bucket.
:param event: Event payload.
:param context: Metadata for the event.
"""
base_dir = '/tmp/astro'
## download dag files to temp local storage
download_to_local(BUCKET, 'dags', f'{base_dir}/dags')
## download astro cli binary and move to /tmp/astro
download_to_local(BUCKET, 'cli_binary', base_dir)
## deploy to astro
os.chdir(base_dir)
untar('./astro_cli.tar.gz', '.')
run_command('echo y | ./astro dev init')
run_command(f'./astro deploy {deploymentId} --dags') -
If you haven't already, deploy your complete Astro project to your Deployment. See Deploy code.
-
Add your DAGs to the
dags
folder in your storage bucket.infoIf you stage multiple commits to DAG files and push them all at once to your remote branch, the template only deploys DAG code changes from the most recent commit. It will miss any code changes made in previous commits.
To avoid this, either push commits individually or configure your repository to Squash commits for pull requests that merge multiple commits simultaneously.
-
In the Astro UI, select a Workspace, click Deployments, and then select your Deployment. Confirm that your deploy worked by checking the Deployment DAG bundle version. The version's name should include the time that you added the DAGs to your GCS bucket.