Skip to content

Commit

Permalink
Add Clickhouse profile mapping (#1016)
Browse files Browse the repository at this point in the history
This PR adds Clickhouse profile mapping using a `generic` connection
type. To prevent cosmos from attaching all generic connections, it uses
a required field named `clickhouse` mapped to `extra.clickhouse`.

To ensure the profile is claimed, users must add the following JSON to
the extra field in the connection:
```JSON
{
    "clickhouse": "True"
}
```
Co-authored-by: Yaniv Rodenski <[email protected]>

Original PR by @roadan:
#353

Closes #95
  • Loading branch information
pankajastro authored Jun 6, 2024
1 parent f980df1 commit bbe4e86
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cosmos/profiles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .bigquery.oauth import GoogleCloudOauthProfileMapping
from .bigquery.service_account_file import GoogleCloudServiceAccountFileProfileMapping
from .bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping
from .clickhouse.user_pass import ClickhouseUserPasswordProfileMapping
from .databricks.token import DatabricksTokenProfileMapping
from .exasol.user_pass import ExasolUserPasswordProfileMapping
from .postgres.user_pass import PostgresUserPasswordProfileMapping
Expand All @@ -25,6 +26,7 @@

profile_mappings: list[Type[BaseProfileMapping]] = [
AthenaAccessKeyProfileMapping,
ClickhouseUserPasswordProfileMapping,
GoogleCloudServiceAccountFileProfileMapping,
GoogleCloudServiceAccountDictProfileMapping,
GoogleCloudOauthProfileMapping,
Expand Down
5 changes: 5 additions & 0 deletions cosmos/profiles/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Generic Airflow connection -> dbt profile mappings"""

from .user_pass import ClickhouseUserPasswordProfileMapping

__all__ = ["ClickhouseUserPasswordProfileMapping"]
70 changes: 70 additions & 0 deletions cosmos/profiles/clickhouse/user_pass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Maps Airflow Postgres connections using user + password authentication to dbt profiles."""

from __future__ import annotations

from typing import Any

from ..base import BaseProfileMapping


class ClickhouseUserPasswordProfileMapping(BaseProfileMapping):
"""
Maps Airflow generic connections using user + password authentication to dbt Clickhouse profiles.
https://docs.getdbt.com/docs/core/connect-data-platform/clickhouse-setup
"""

airflow_connection_type: str = "generic"
dbt_profile_type: str = "clickhouse"
default_port = 9000
is_community = True

required_fields = [
"host",
"login",
"schema",
"clickhouse",
]
secret_fields = [
"password",
]
airflow_param_mapping = {
"host": "host",
"login": "login",
"password": "password",
"port": "port",
"schema": "schema",
"clickhouse": "extra.clickhouse",
}

def _set_default_param(self, profile_dict: dict[str, Any]) -> dict[str, Any]:
if not profile_dict.get("driver"):
profile_dict["driver"] = "native"

if not profile_dict.get("port"):
profile_dict["port"] = self.default_port

if not profile_dict.get("secure"):
profile_dict["secure"] = False
return profile_dict

@property
def profile(self) -> dict[str, Any | None]:
"""Gets profile. The password is stored in an environment variable."""
profile_dict = {
**self.mapped_params,
**self.profile_args,
# password should always get set as env var
"password": self.get_env_var_format("password"),
}

return self.filter_null(self._set_default_param(profile_dict))

@property
def mock_profile(self) -> dict[str, Any | None]:
"""Gets mock profile."""

profile_dict = {
**super().mock_profile,
}

return self._set_default_param(profile_dict)
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies = [
dbt-all = [
"dbt-athena",
"dbt-bigquery",
"dbt-clickhouse",
"dbt-databricks",
"dbt-exasol",
"dbt-postgres",
Expand All @@ -53,6 +54,7 @@ dbt-all = [
]
dbt-athena = ["dbt-athena-community", "apache-airflow-providers-amazon>=8.0.0"]
dbt-bigquery = ["dbt-bigquery"]
dbt-clickhouse = ["dbt-clickhouse"]
dbt-databricks = ["dbt-databricks"]
dbt-exasol = ["dbt-exasol"]
dbt-postgres = ["dbt-postgres"]
Expand Down
Empty file.
117 changes: 117 additions & 0 deletions tests/profiles/clickhouse/test_clickhouse_userpass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Tests for the clickhouse profile."""

from unittest.mock import patch

import pytest
from airflow.models.connection import Connection

from cosmos.profiles import get_automatic_profile_mapping
from cosmos.profiles.clickhouse.user_pass import (
ClickhouseUserPasswordProfileMapping,
)


@pytest.fixture()
def mock_clickhouse_conn(): # type: ignore
"""Sets the connection as an environment variable."""
conn = Connection(
conn_id="clickhouse_connection",
conn_type="generic",
host="my_host",
login="my_user",
password="my_password",
schema="my_database",
extra='{"clickhouse": "True"}',
)

with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
yield conn


def test_connection_claiming1() -> None:
"""
Tests that the clickhouse profile mapping claims the correct connection type.
should only claim when:
- conn_type == generic
And the following exist:
- host
- login
- password
- schema
- extra.clickhouse
"""
required_values = {
"conn_type": "generic",
"host": "my_host",
"login": "my_user",
"schema": "my_database",
"extra": '{"clickhouse": "True"}',
}

def can_claim_with_missing_key(missing_key: str) -> bool:
values = required_values.copy()
del values[missing_key]
conn = Connection(**values) # type: ignore
with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {})
return profile_mapping.can_claim_connection()

# if we're missing any of the required values, it shouldn't claim
for key in required_values:
assert not can_claim_with_missing_key(key), f"Failed when missing {key}"

# if we have all the required values, it should claim
conn = Connection(**required_values) # type: ignore
with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {})
assert profile_mapping.can_claim_connection()


def test_profile_mapping_selected(
mock_clickhouse_conn: Connection,
) -> None:
"""Tests that the correct profile mapping is selected."""
profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, {})
assert isinstance(profile_mapping, ClickhouseUserPasswordProfileMapping)


def test_profile_args(mock_clickhouse_conn: Connection) -> None:
"""Tests that the profile values get set correctly."""
profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, profile_args={})

assert profile_mapping.profile == {
"type": "clickhouse",
"schema": mock_clickhouse_conn.schema,
"login": mock_clickhouse_conn.login,
"password": "{{ env_var('COSMOS_CONN_GENERIC_PASSWORD') }}",
"driver": "native",
"port": 9000,
"host": mock_clickhouse_conn.host,
"secure": False,
"clickhouse": "True",
}


def test_mock_profile() -> None:
"""Tests that the mock_profile values get set correctly."""
profile_mapping = ClickhouseUserPasswordProfileMapping(
"conn_id"
) # get_automatic_profile_mapping("mock_clickhouse_conn.conn_id", profile_args={})

assert profile_mapping.mock_profile == {
"type": "clickhouse",
"schema": "mock_value",
"login": "mock_value",
"driver": "native",
"port": 9000,
"host": "mock_value",
"secure": False,
"clickhouse": "mock_value",
}


def test_profile_env_vars(mock_clickhouse_conn: Connection) -> None:
"""Tests that the environment variables get set correctly."""
profile_mapping = get_automatic_profile_mapping(mock_clickhouse_conn.conn_id, profile_args={})
assert profile_mapping.env_vars == {"COSMOS_CONN_GENERIC_PASSWORD": mock_clickhouse_conn.password}

0 comments on commit bbe4e86

Please sign in to comment.