Skip to content

Commit

Permalink
Add Azure Container Instance as Execution Mode (#771)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvdende authored Feb 2, 2024
1 parent 7a9b697 commit 714ecbd
Show file tree
Hide file tree
Showing 15 changed files with 538 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ repos:
args:
- --exclude-file=tests/sample/manifest_model_version.json
- --skip=**/manifest.json
- -L connexion
- -L connexion,aci
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
Expand Down
37 changes: 37 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,37 @@
"kubernetes",
)

try:
from cosmos.operators.azure_container_instance import (
DbtLSAzureContainerInstanceOperator,
DbtRunAzureContainerInstanceOperator,
DbtRunOperationAzureContainerInstanceOperator,
DbtSeedAzureContainerInstanceOperator,
DbtSnapshotAzureContainerInstanceOperator,
DbtTestAzureContainerInstanceOperator,
)
except ImportError:
DbtLSAzureContainerInstanceOperator = MissingPackage(
"cosmos.operators.azure_container_instance.DbtLSAzureContainerInstanceOperator", "azure-container-instance"
)
DbtRunAzureContainerInstanceOperator = MissingPackage(
"cosmos.operators.azure_container_instance.DbtRunAzureContainerInstanceOperator", "azure-container-instance"
)
DbtRunOperationAzureContainerInstanceOperator = MissingPackage(
"cosmos.operators.azure_container_instance.DbtRunOperationAzureContainerInstanceOperator",
"azure-container-instance",
)
DbtSeedAzureContainerInstanceOperator = MissingPackage(
"cosmos.operators.azure_container_instance.DbtSeedAzureContainerInstanceOperator", "azure-container-instance"
)
DbtSnapshotAzureContainerInstanceOperator = MissingPackage(
"cosmos.operators.azure_container_instance.DbtSnapshotAzureContainerInstanceOperator",
"azure-container-instance",
)
DbtTestAzureContainerInstanceOperator = MissingPackage(
"cosmos.operators.azure_container_instance.DbtTestAzureContainerInstanceOperator", "azure-container-instance"
)

__all__ = [
"ProjectConfig",
"ProfileConfig",
Expand Down Expand Up @@ -117,6 +148,12 @@
"DbtTestKubernetesOperator",
"DbtBuildKubernetesOperator",
"DbtSnapshotKubernetesOperator",
"DbtLSAzureContainerInstanceOperator",
"DbtRunOperationAzureContainerInstanceOperator",
"DbtRunAzureContainerInstanceOperator",
"DbtSeedAzureContainerInstanceOperator",
"DbtTestAzureContainerInstanceOperator",
"DbtSnapshotAzureContainerInstanceOperator",
"ExecutionMode",
"LoadMode",
"TestBehavior",
Expand Down
15 changes: 14 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
logger = get_logger(__name__)


def _snake_case_to_camelcase(value: str) -> str:
"""Convert snake_case to CamelCase
Example: foo_bar_baz -> FooBarBaz
:param value: Value to convert to CamelCase
:return: Converted value
"""
return "".join(x.capitalize() for x in value.lower().split("_"))


def calculate_operator_class(
execution_mode: ExecutionMode,
dbt_class: str,
Expand All @@ -37,7 +48,9 @@ def calculate_operator_class(
:returns: path string to the correspondent Cosmos Airflow operator
(e.g. cosmos.operators.localDbtSnapshotLocalOperator)
"""
return f"cosmos.operators.{execution_mode.value}.{dbt_class}{execution_mode.value.capitalize()}Operator"
return (
f"cosmos.operators.{execution_mode.value}.{dbt_class}{_snake_case_to_camelcase(execution_mode.value)}Operator"
)


def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[str]:
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ExecutionMode(Enum):
DOCKER = "docker"
KUBERNETES = "kubernetes"
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"


class TestIndirectSelection(Enum):
Expand Down
133 changes: 133 additions & 0 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from __future__ import annotations

from typing import Any, Callable, Sequence

from airflow.utils.context import Context
from cosmos.config import ProfileConfig

from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtRunMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtTestMixin,
DbtLSMixin,
DbtRunOperationMixin,
)

logger = get_logger(__name__)

# ACI is an optional dependency, so we need to check if it's installed
try:
from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
except ImportError:
raise ImportError(
"Could not import AzureContainerInstancesOperator. Ensure you've installed the Microsoft Azure provider "
"separately or with `pip install astronomer-cosmos[...,azure-container-instance]`."
)


class DbtAzureContainerInstanceBaseOperator(AzureContainerInstancesOperator, AbstractDbtBaseOperator): # type: ignore
"""
Executes a dbt core cli command in an Azure Container Instance
"""

template_fields: Sequence[str] = tuple(
list(AbstractDbtBaseOperator.template_fields) + list(AzureContainerInstancesOperator.template_fields)
)

def __init__(
self,
ci_conn_id: str,
resource_group: str,
name: str,
image: str,
region: str,
profile_config: ProfileConfig | None = None,
remove_on_error: bool = False,
fail_if_exists: bool = False,
registry_conn_id: str | None = None, # need to add a default for Airflow 2.3 support
**kwargs: Any,
) -> None:
self.profile_config = profile_config
super().__init__(
ci_conn_id=ci_conn_id,
resource_group=resource_group,
name=name,
image=image,
region=region,
remove_on_error=remove_on_error,
fail_if_exists=fail_if_exists,
registry_conn_id=registry_conn_id,
**kwargs,
)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = super().execute(context)
logger.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
# This means that we don't have openlineage support, but we will create a ticket
# to add that in the future
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
self.environment_variables: dict[str, Any] = {**env_vars, **self.environment_variables}
self.command: list[str] = dbt_cmd


class DbtLSAzureContainerInstanceOperator(DbtLSMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core ls command.
"""


class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core seed command.
:param full_refresh: dbt optional arg - dbt will treat incremental models as table models
"""

template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]


class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core snapshot command.
"""


class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core run command.
"""

template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]


class DbtTestAzureContainerInstanceOperator(DbtTestMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core test command.
"""

def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
# as of now, on_warning_callback in azure container instance executor does nothing
self.on_warning_callback = on_warning_callback


class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core run-operation command.
:param macro_name: name of macro to execute
:param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the
selected macro.
"""

template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]
2 changes: 1 addition & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import airflow
import jinja2
from airflow import DAG
from airflow.compat.functools import cached_property
from functools import cached_property
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any

from airflow.compat.functools import cached_property
from functools import cached_property
from airflow.utils.python_virtualenv import prepare_virtualenv
from cosmos.hooks.subprocess import FullOutputSubprocessResult

Expand Down
Binary file added docs/_static/cosmos_aci_schematic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
135 changes: 135 additions & 0 deletions docs/getting_started/azure-container-instance.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
.. _azure-container-instance

Check warning on line 1 in docs/getting_started/azure-container-instance.rst

View workflow job for this annotation

GitHub Actions / pages

malformed hyperlink target.
Azure Container Instance Execution Mode
=======================================
.. versionadded:: 1.4
This tutorial will guide you through the steps required to use Azure Container Instance as the Execution Mode for your dbt code with Astronomer Cosmos. Schematically, the guide will walk you through the steps required to build the following architecture:

Check warning on line 6 in docs/getting_started/azure-container-instance.rst

View workflow job for this annotation

GitHub Actions / pages

Explicit markup ends without a blank line; unexpected unindent.

.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/cosmos_aci_schematic.png
:width: 800

Prerequisites
+++++++++++++
1. Docker with docker daemon (Docker Desktop on MacOS). Follow the `Docker installation guide <https://docs.docker.com/engine/install/>`_.
2. Airflow
3. Azure CLI (install guide here: `Azure CLI <https://docs.microsoft.com/en-us/cli/azure/install-azure-cli>`_)
4. Astronomer-cosmos package containing the dbt Azure Container Instance operators
5. Azure account with:
1. A resource group
2. A service principal with `Contributor` permissions on the resource group
3. A Container Registry
4. A Postgres instance accessible from Azure. (we use an Azure Postgres instance in the example)
6. Docker image built with required dbt project and dbt DAG
7. dbt DAG with dbt Azure Container Instance operators in the Airflow DAGs directory to run in Airflow

More information on how to achieve 2-6 is detailed below.

Note that the steps below will walk you through an example, for which the code can be found HERE

Step-by-step guide
++++++++++++++++++

**Install Airflow and Cosmos**

Create a python virtualenv, activate it, upgrade pip to the latest version and install apache airflow & astronomer-postgres

.. code-block:: bash
python -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install apache-airflow
pip install "astronomer-cosmos[dbt-postgres,azure-container-instance]"
**Setup Postgres database**

You will need a postgres database running to be used as the database for the dbt project. In order to have it accessible from Azure Container Instance, the easiest way is to create an Azure Postgres instance. For this, run the following (assuming you are logged into your Azure account)

.. code-block:: bash
az postgres server create -l westeurope -g <<<YOUR_RG>>> -n <<<YOUR_DATABASE_NAME>>> -u dbadmin -p <<<YOUR_PASSWORD_HERE>>> --sku-name B_Gen5_1 --ssl-enforcement Enabled
**Setup Azure Container Registry**
In order to run a container in Azure Container Instance, it needs access to the container image. In our setup, we will use Azure Container Registry for this. To set an Azure Container Registry up, you can use the following bash command:

.. code-block:: bash

Check failure on line 56 in docs/getting_started/azure-container-instance.rst

View workflow job for this annotation

GitHub Actions / pages

Error in "code-block" directive:
az acr create --name <<<YOUR_REGISTRY_NAME>>> --resource-group <<<YOUR_RG>>> --sku Basic --admin-enabled
**Build the dbt Docker image**

For the Docker operators to work, you need to create a docker image that will be supplied as image parameter to the dbt docker operators used in the DAG.

Clone the `cosmos-example <https://github.com/astronomer/cosmos-example.git>`_ repo

.. code-block:: bash
git clone https://github.com/astronomer/cosmos-example.git
cd cosmos-example
Create a docker image containing the dbt project files and dbt profile by using the `Dockerfile <https://github.com/astronomer/cosmos-example/blob/main/Dockerfile.azure_container_instance>`_, which will be supplied to the Docker operators.

.. code-block:: bash
docker build -t <<<YOUR_IMAGE_NAME_HERE>>:1.0.0 -f Dockerfile.azure_container_instance .
After this, the image needs to be pushed to the registry of your choice. Note that your image name should contain the name of your registry:
.. code-block:: bash
docker push <<<YOUR_IMAGE_NAMEHERE>>>:1.0.0
.. note::

You may need to ensure docker knows to use the right credentials. If using Azure Container Registry, this can be done by running the following command:
.. code-block:: bash
az acr login

Check failure on line 85 in docs/getting_started/azure-container-instance.rst

View workflow job for this annotation

GitHub Actions / pages

Unexpected indentation.
.. note::

If running on M1, you may need to set the following envvars for running the docker build command in case it fails

.. code-block:: bash
export DOCKER_BUILDKIT=0
export COMPOSE_DOCKER_CLI_BUILD=0
export DOCKER_DEFAULT_PLATFORM=linux/amd64
Take a read of the Dockerfile to understand what it does so that you could use it as a reference in your project.

- The `dbt profile <https://github.com/astronomer/cosmos-example/blob/main/example_postgres_profile.yml>`_ file is added to the image
- The dags directory containing the `dbt project jaffle_shop <https://github.com/astronomer/cosmos-example/tree/main/dags/dbt/jaffle_shop>`_ is added to the image
- The dbt_project.yml is replaced with `postgres_profile_dbt_project.yml <https://github.com/astronomer/cosmos-example/blob/main/postgres_profile_dbt_project.yml>`_ which contains the profile key pointing to postgres_profile as profile creation is not handled at the moment for K8s operators like in local mode.

**Setup Airflow Connections**
Now you have the required Azure infrastructure, you still need to add configuration to Airflow to ensure the infrastructure can be used. You'll need 3 connections:

1. ``aci_db``: a Postgres connection to your Azure Postgres instance.
2. ``aci``: an Azure Container Instance connection configured with a Service Principal with sufficient permissions (i.e. ``Contributor`` on the resource group in which you will use Azure Container Instances).
3. ``acr``: an Azure Container Registry connection configured for your Azure Container Registry.

Check out the ``airflow-settings.yml`` file `here <https://github.com/astronomer/cosmos-example/blob/main/airflow_settings.yaml>`_ for an example. If you are using Astro CLI, filling in the right values here will be enough for this to work.

**Setup and Trigger the DAG with Airflow**

Copy the dags directory from cosmos-example repo to your Airflow home

.. code-block:: bash
cp -r dags $AIRFLOW_HOME/
Run Airflow

.. code-block:: bash
airflow standalone
.. note::

You might need to run airflow standalone with ``sudo`` if your Airflow user is not able to access the docker socket URL or pull the images in the Kind cluster.

Log in to Airflow through a web browser ``http://localhost:8080/``, using the user ``airflow`` and the password described in the ``standalone_admin_password.txt`` file.

Enable and trigger a run of the `jaffle_shop_azure_container_instance <https://github.com/astronomer/cosmos-example/blob/main/dags/jaffle_shop_azure_container_instance.py>`_ DAG. You will be able to see the following successful DAG run.

.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/jaffle_shop_azure_container_instance.png
:width: 800
Loading

0 comments on commit 714ecbd

Please sign in to comment.