diff --git a/cosmos/profiles/__init__.py b/cosmos/profiles/__init__.py index fa8e5c370..5cc3109cc 100644 --- a/cosmos/profiles/__init__.py +++ b/cosmos/profiles/__init__.py @@ -9,6 +9,7 @@ from .bigquery.oauth import GoogleCloudOauthProfileMapping from .bigquery.service_account_file import GoogleCloudServiceAccountFileProfileMapping from .bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping +from .clickhouse.user_pass import ClickhouseUserPasswordProfileMapping from .databricks.token import DatabricksTokenProfileMapping from .exasol.user_pass import ExasolUserPasswordProfileMapping from .postgres.user_pass import PostgresUserPasswordProfileMapping @@ -25,6 +26,7 @@ profile_mappings: list[Type[BaseProfileMapping]] = [ AthenaAccessKeyProfileMapping, + ClickhouseUserPasswordProfileMapping, GoogleCloudServiceAccountFileProfileMapping, GoogleCloudServiceAccountDictProfileMapping, GoogleCloudOauthProfileMapping, diff --git a/cosmos/profiles/clickhouse/__init__.py b/cosmos/profiles/clickhouse/__init__.py new file mode 100644 index 000000000..bd94af5fe --- /dev/null +++ b/cosmos/profiles/clickhouse/__init__.py @@ -0,0 +1,5 @@ +"""Generic Airflow connection -> dbt profile mappings""" + +from .user_pass import ClickhouseUserPasswordProfileMapping + +__all__ = ["ClickhouseUserPasswordProfileMapping"] diff --git a/cosmos/profiles/clickhouse/user_pass.py b/cosmos/profiles/clickhouse/user_pass.py new file mode 100644 index 000000000..7d168895a --- /dev/null +++ b/cosmos/profiles/clickhouse/user_pass.py @@ -0,0 +1,70 @@ +"""Maps Airflow Postgres connections using user + password authentication to dbt profiles.""" + +from __future__ import annotations + +from typing import Any + +from ..base import BaseProfileMapping + + +class ClickhouseUserPasswordProfileMapping(BaseProfileMapping): + """ + Maps Airflow generic connections using user + password authentication to dbt Clickhouse profiles. + https://docs.getdbt.com/docs/core/connect-data-platform/clickhouse-setup + """ + + airflow_connection_type: str = "generic" + dbt_profile_type: str = "clickhouse" + default_port = 9000 + is_community = True + + required_fields = [ + "host", + "login", + "schema", + "clickhouse", + ] + secret_fields = [ + "password", + ] + airflow_param_mapping = { + "host": "host", + "login": "login", + "password": "password", + "port": "port", + "schema": "schema", + "clickhouse": "extra.clickhouse", + } + + def _set_default_param(self, profile_dict: dict[str, Any]) -> dict[str, Any]: + if not profile_dict.get("driver"): + profile_dict["driver"] = "native" + + if not profile_dict.get("port"): + profile_dict["port"] = self.default_port + + if not profile_dict.get("secure"): + profile_dict["secure"] = False + return profile_dict + + @property + def profile(self) -> dict[str, Any | None]: + """Gets profile. The password is stored in an environment variable.""" + profile_dict = { + **self.mapped_params, + **self.profile_args, + # password should always get set as env var + "password": self.get_env_var_format("password"), + } + + return self.filter_null(self._set_default_param(profile_dict)) + + @property + def mock_profile(self) -> dict[str, Any | None]: + """Gets mock profile.""" + + profile_dict = { + **super().mock_profile, + } + + return self._set_default_param(profile_dict) diff --git a/pyproject.toml b/pyproject.toml index 238b877e4..ea97a9c0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ dependencies = [ dbt-all = [ "dbt-athena", "dbt-bigquery", + "dbt-clickhouse", "dbt-databricks", "dbt-exasol", "dbt-postgres", @@ -53,6 +54,7 @@ dbt-all = [ ] dbt-athena = ["dbt-athena-community", "apache-airflow-providers-amazon>=8.0.0"] dbt-bigquery = ["dbt-bigquery"] +dbt-clickhouse = ["dbt-clickhouse"] dbt-databricks = ["dbt-databricks"] dbt-exasol = ["dbt-exasol"] dbt-postgres = ["dbt-postgres"] diff --git a/tests/profiles/clickhouse/__init__.py b/tests/profiles/clickhouse/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/profiles/clickhouse/test_clickhouse_userpass.py b/tests/profiles/clickhouse/test_clickhouse_userpass.py new file mode 100644 index 000000000..1f623c803 --- /dev/null +++ b/tests/profiles/clickhouse/test_clickhouse_userpass.py @@ -0,0 +1,117 @@ +"""Tests for the clickhouse profile.""" + +from unittest.mock import patch + +import pytest +from airflow.models.connection import Connection + +from cosmos.profiles import get_automatic_profile_mapping +from cosmos.profiles.clickhouse.user_pass import ( + ClickhouseUserPasswordProfileMapping, +) + + +@pytest.fixture() +def mock_clickhouse_conn(): # type: ignore + """Sets the connection as an environment variable.""" + conn = Connection( + conn_id="clickhouse_connection", + conn_type="generic", + host="my_host", + login="my_user", + password="my_password", + schema="my_database", + extra='{"clickhouse": "True"}', + ) + + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + yield conn + + +def test_connection_claiming1() -> None: + """ + Tests that the clickhouse profile mapping claims the correct connection type. + + should only claim when: + - conn_type == generic + And the following exist: + - host + - login + - password + - schema + - extra.clickhouse + """ + required_values = { + "conn_type": "generic", + "host": "my_host", + "login": "my_user", + "schema": "my_database", + "extra": '{"clickhouse": "True"}', + } + + def can_claim_with_missing_key(missing_key: str) -> bool: + values = required_values.copy() + del values[missing_key] + conn = Connection(**values) # type: ignore + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {}) + return profile_mapping.can_claim_connection() + + # if we're missing any of the required values, it shouldn't claim + for key in required_values: + assert not can_claim_with_missing_key(key), f"Failed when missing {key}" + + # if we have all the required values, it should claim + conn = Connection(**required_values) # type: ignore + with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn): + profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {}) + assert profile_mapping.can_claim_connection() + + +def test_profile_mapping_selected( + mock_clickhouse_conn: Connection, +) -> None: + """Tests that the correct profile mapping is selected.""" + profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, {}) + assert isinstance(profile_mapping, ClickhouseUserPasswordProfileMapping) + + +def test_profile_args(mock_clickhouse_conn: Connection) -> None: + """Tests that the profile values get set correctly.""" + profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, profile_args={}) + + assert profile_mapping.profile == { + "type": "clickhouse", + "schema": mock_clickhouse_conn.schema, + "login": mock_clickhouse_conn.login, + "password": "{{ env_var('COSMOS_CONN_GENERIC_PASSWORD') }}", + "driver": "native", + "port": 9000, + "host": mock_clickhouse_conn.host, + "secure": False, + "clickhouse": "True", + } + + +def test_mock_profile() -> None: + """Tests that the mock_profile values get set correctly.""" + profile_mapping = ClickhouseUserPasswordProfileMapping( + "conn_id" + ) # get_automatic_profile_mapping("mock_clickhouse_conn.conn_id", profile_args={}) + + assert profile_mapping.mock_profile == { + "type": "clickhouse", + "schema": "mock_value", + "login": "mock_value", + "driver": "native", + "port": 9000, + "host": "mock_value", + "secure": False, + "clickhouse": "mock_value", + } + + +def test_profile_env_vars(mock_clickhouse_conn: Connection) -> None: + """Tests that the environment variables get set correctly.""" + profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, profile_args={}) + assert profile_mapping.env_vars == {"COSMOS_CONN_GENERIC_PASSWORD": mock_clickhouse_conn.password}