-
Notifications
You must be signed in to change notification settings - Fork 178
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support for Snowflake encrypted private key environment variable (#649)
Adds a snowflake mapping for encrypted private key using an environment variable Closes: #632 Breaking Change? This does rename the previous SnowflakeEncryptedPrivateKeyFilePemProfileMapping to SnowflakeEncryptedPrivateKeyFilePemProfileMapping but this makes it clearer as a new SnowflakeEncryptedPrivateKeyPemProfileMapping is added which supports the env variable. Also was only released as a pre-release change
- Loading branch information
1 parent
8e2d590
commit 2ec33b4
Showing
7 changed files
with
338 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
93 changes: 93 additions & 0 deletions
93
cosmos/profiles/snowflake/user_encrypted_privatekey_env_variable.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
"Maps Airflow Snowflake connections to dbt profiles if they use a user/private key." | ||
from __future__ import annotations | ||
|
||
import json | ||
from typing import TYPE_CHECKING, Any | ||
|
||
from ..base import BaseProfileMapping | ||
|
||
if TYPE_CHECKING: | ||
from airflow.models import Connection | ||
|
||
|
||
class SnowflakeEncryptedPrivateKeyPemProfileMapping(BaseProfileMapping): | ||
""" | ||
Maps Airflow Snowflake connections to dbt profiles if they use a user/private key. | ||
https://docs.getdbt.com/docs/core/connect-data-platform/snowflake-setup#key-pair-authentication | ||
https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/connections/snowflake.html | ||
""" | ||
|
||
airflow_connection_type: str = "snowflake" | ||
dbt_profile_type: str = "snowflake" | ||
is_community: bool = True | ||
|
||
required_fields = [ | ||
"account", | ||
"user", | ||
"database", | ||
"warehouse", | ||
"schema", | ||
"private_key", | ||
"private_key_passphrase", | ||
] | ||
secret_fields = [ | ||
"private_key", | ||
"private_key_passphrase", | ||
] | ||
airflow_param_mapping = { | ||
"account": "extra.account", | ||
"user": "login", | ||
"database": "extra.database", | ||
"warehouse": "extra.warehouse", | ||
"schema": "schema", | ||
"role": "extra.role", | ||
"private_key": "extra.private_key_content", | ||
"private_key_passphrase": "password", | ||
} | ||
|
||
def can_claim_connection(self) -> bool: | ||
# Make sure this isn't a private key path credential | ||
result = super().can_claim_connection() | ||
if result and self.conn.extra_dejson.get("private_key_file") is not None: | ||
return False | ||
return result | ||
|
||
@property | ||
def conn(self) -> Connection: | ||
""" | ||
Snowflake can be odd because the fields used to be stored with keys in the format | ||
'extra__snowflake__account', but now are stored as 'account'. | ||
This standardizes the keys to be 'account', 'database', etc. | ||
""" | ||
conn = super().conn | ||
|
||
conn_dejson = conn.extra_dejson | ||
|
||
if conn_dejson.get("extra__snowflake__account"): | ||
conn_dejson = {key.replace("extra__snowflake__", ""): value for key, value in conn_dejson.items()} | ||
|
||
conn.extra = json.dumps(conn_dejson) | ||
|
||
return conn | ||
|
||
@property | ||
def profile(self) -> dict[str, Any | None]: | ||
"Gets profile." | ||
profile_vars = { | ||
**self.mapped_params, | ||
**self.profile_args, | ||
"private_key": self.get_env_var_format("private_key"), | ||
"private_key_passphrase": self.get_env_var_format("private_key_passphrase"), | ||
} | ||
|
||
# remove any null values | ||
return self.filter_null(profile_vars) | ||
|
||
def transform_account(self, account: str) -> str: | ||
"Transform the account to the format <account>.<region> if it's not already." | ||
region = self.conn.extra_dejson.get("region") | ||
if region and region not in account: | ||
account = f"{account}.{region}" | ||
|
||
return str(account) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.