diff --git a/CHANGELOG.md b/CHANGELOG.md index b6a4858ed..9256b1ccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Added option for sap_rfc connector to get credentials from Azure KeyVault or directly passing dictionary inside flow. + ### Fixed - Fixed the `if_exists` parameter definition in the `CreateTableFromBlob` task. - Changed `requirements.txt` to level up version of `dbt-sqlserver` in order to fix bug with `MAXRECURSION` error in dbt_run + ### Changed ### Removed diff --git a/tests/integration/tasks/test_sap_rfc_to_df.py b/tests/integration/tasks/test_sap_rfc_to_df.py index 4151adfdc..7007a978c 100644 --- a/tests/integration/tasks/test_sap_rfc_to_df.py +++ b/tests/integration/tasks/test_sap_rfc_to_df.py @@ -1,13 +1,52 @@ +import pytest +import logging + +from viadot.exceptions import CredentialError from viadot.config import local_config from viadot.tasks import SAPRFCToDF def test_sap_rfc_to_df_bbp(): - sap_test_creds = local_config.get("SAP").get("QA") task = SAPRFCToDF( - credentials=sap_test_creds, - query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '2022%'", + query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '20220110%'", func="BBP_RFC_READ_TABLE", ) - df = task.run() + df = task.run(sap_credentials_key="SAP", env="QA") assert len(df.columns) == 4 and not df.empty + + +def test_sap_rfc_to_df_wrong_sap_credential_key_bbp(caplog): + task = SAPRFCToDF( + query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '20220110%'", + func="BBP_RFC_READ_TABLE", + ) + with pytest.raises( + CredentialError, + match="Sap_credentials_key: SAP_test is not stored neither in KeyVault or Local Config!", + ): + task.run( + sap_credentials_key="SAP_test", + ) + assert ( + f"Getting credentials from Azure Key Vault was not possible. Either there is no key: SAP_test or env: DEV or there is not Key Vault in your environment." + in caplog.text + ) + + +def test_sap_rfc_to_df_wrong_env_bbp(caplog): + task = SAPRFCToDF( + query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '20220110%'", + func="BBP_RFC_READ_TABLE", + ) + with pytest.raises( + CredentialError, + match="Missing PROD_test credentials!", + ): + task.run( + sap_credentials_key="SAP", + env="PROD_test", + ) + assert ( + f"Getting credentials from Azure Key Vault was not possible. Either there is no key: SAP or env: PROD_test or there is not Key Vault in your environment." + in caplog.text + ) diff --git a/tests/integration/test_sap_rfc.py b/tests/integration/test_sap_rfc.py index 28ab044a2..2667d4d81 100644 --- a/tests/integration/test_sap_rfc.py +++ b/tests/integration/test_sap_rfc.py @@ -1,6 +1,9 @@ -from collections import OrderedDict +import pytest +import logging +from collections import OrderedDict from viadot.sources import SAPRFC, SAPRFCV2 +from viadot.exceptions import CredentialError sap = SAPRFC() sap2 = SAPRFCV2() @@ -192,7 +195,7 @@ def test___build_pandas_filter_query_v2(): def test_default_credentials_warning_SAPRFC(caplog): _ = SAPRFC() assert ( - "Your credentials will use DEV environment. If you would like to use different one - please specified it." + f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in sap_credentials parameter" in caplog.text ) @@ -200,6 +203,80 @@ def test_default_credentials_warning_SAPRFC(caplog): def test_default_credentials_warning_SAPRFCV2(caplog): _ = SAPRFCV2() assert ( - "Your credentials will use DEV environment. If you would like to use different one - please specified it." + f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in sap_credentials parameter" + in caplog.text + ) + + +def test_credentials_dictionary_wrong_key_warning_SAPRFC(caplog): + _ = SAPRFC( + sap_credentials={ + "sysnr_test": "test", + "user": "test", + "passwd": "test", + "ashost": "test", + } + ) + assert ( + f"Required key 'sysnr' not found in your 'sap_credentials' dictionary!" + in caplog.text + ) + assert ( + f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in sap_credentials parameter" in caplog.text ) + + +def test_credentials_dictionary_wrong_key_warning_SAPRFCV2(caplog): + _ = SAPRFCV2( + sap_credentials={ + "sysnr_test": "test", + "user": "test", + "passwd": "test", + "ashost": "test", + } + ) + assert ( + f"Required key 'sysnr' not found in your 'sap_credentials' dictionary!" + in caplog.text + ) + assert ( + f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in sap_credentials parameter" + in caplog.text + ) + + +def test_sap_credentials_key_wrong_value_error_SAPRFC(caplog): + with pytest.raises( + CredentialError, + match="Sap_credentials_key: SAP_test is not stored neither in KeyVault or Local Config!", + ): + with caplog.at_level(logging.ERROR): + _ = SAPRFC(sap_credentials_key="SAP_test") + + +def test_sap_credentials_key_wrong_value_error_SAPRFCV2(caplog): + with pytest.raises( + CredentialError, + match="Sap_credentials_key: SAP_test is not stored neither in KeyVault or Local Config!", + ): + with caplog.at_level(logging.ERROR): + _ = SAPRFC(sap_credentials_key="SAP_test") + + +def test_env_wrong_value_error_SAPRFC(caplog): + with pytest.raises( + CredentialError, + match="Missing PROD_test credentials!", + ): + with caplog.at_level(logging.ERROR): + _ = SAPRFC(env="PROD_test") + + +def test_env_wrong_value_error_SAPRFCV2(caplog): + with pytest.raises( + CredentialError, + match="Missing PROD_test credentials!", + ): + with caplog.at_level(logging.ERROR): + _ = SAPRFC(env="PROD_test") diff --git a/viadot/flows/sap_rfc_to_adls.py b/viadot/flows/sap_rfc_to_adls.py index d7a2ac390..5a380bbaa 100644 --- a/viadot/flows/sap_rfc_to_adls.py +++ b/viadot/flows/sap_rfc_to_adls.py @@ -10,13 +10,15 @@ class SAPRFCToADLS(Flow): def __init__( self, name: str, - query: str = None, + query: str, rfc_sep: str = None, rfc_replacement: str = "-", func: str = "RFC_READ_TABLE", rfc_total_col_width_character_limit: int = 400, rfc_unique_id: List[str] = None, sap_credentials: dict = None, + sap_credentials_key: str = "SAP", + env: str = "DEV", output_file_extension: str = ".parquet", local_file_path: str = None, file_sep: str = "\t", @@ -66,7 +68,9 @@ def __init__( rfc_unique_id=["VBELN", "LPRIO"], ... ) - sap_credentials (dict, optional): The credentials to use to authenticate with SAP. By default, they're taken from the local viadot config. + sap_credentials (dict, optional): The credentials to use to authenticate with SAP. Defaults to None. + sap_credentials_key (str, optional): The key for sap credentials located in the local config or Azure Key Vault. Defaults to "SAP". + env (str, optional): The key for sap_credentials_key pointing to the SAP environment. Defaults to "DEV" output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet". local_file_path (str, optional): Local destination path. Defaults to None. file_sep(str, optional): The separator to use in the CSV. Defaults to "\t". @@ -91,6 +95,8 @@ def __init__( self.rfc_total_col_width_character_limit = rfc_total_col_width_character_limit self.rfc_unique_id = rfc_unique_id self.sap_credentials = sap_credentials + self.sap_credentials_key = sap_credentials_key + self.env = env self.output_file_extension = output_file_extension self.local_file_path = local_file_path self.file_sep = file_sep @@ -120,7 +126,9 @@ def gen_flow(self) -> Flow: rfc_total_col_width_character_limit=self.rfc_total_col_width_character_limit, rfc_unique_id=self.rfc_unique_id, alternative_version=self.alternative_version, - credentials=self.sap_credentials, + sap_credentials=self.sap_credentials, + sap_credentials_key=self.sap_credentials_key, + env=self.env, flow=self, ) if self.validate_df_dict: diff --git a/viadot/sources/sap_rfc.py b/viadot/sources/sap_rfc.py index a9d109148..8d53bd230 100644 --- a/viadot/sources/sap_rfc.py +++ b/viadot/sources/sap_rfc.py @@ -237,6 +237,9 @@ def __init__( sep: str = None, func: str = "RFC_READ_TABLE", rfc_total_col_width_character_limit: int = 400, + sap_credentials: dict = None, + sap_credentials_key: str = "SAP", + env: str = "DEV", *args, **kwargs, ): @@ -250,25 +253,47 @@ def __init__( in case of too many columns for RFC function. According to SAP documentation, the limit is 512 characters. However, we observed SAP raising an exception even on a slightly lower number of characters, so we add a safety margin. Defaults to 400. + sap_credentials (dict, optional): The credentials to use to authenticate with SAP. Defaults to None. + sap_credentials_key (str, optional): The key for sap credentials located in the local config or Azure Key Vault. Defaults to "SAP". + env (str, optional): The key for sap_credentials_key pointing to the SAP environment. Defaults to "DEV". Raises: CredentialError: If provided credentials are incorrect. """ self._con = None - DEFAULT_CREDENTIALS = local_config.get("SAP").get("DEV") + self.sap_credentials_key = sap_credentials_key + self.env = env + + if isinstance(sap_credentials, dict): + credentials_keys = list(sap_credentials.keys()) + required_credentials_params = ["sysnr", "user", "passwd", "ashost"] + for key in required_credentials_params: + if key not in credentials_keys: + logger.warning( + f"Required key '{key}' not found in your 'sap_credentials' dictionary!" + ) + sap_credentials = None - credentials = kwargs.pop("credentials", None) - if credentials is None: - credentials = DEFAULT_CREDENTIALS - if credentials is None: - raise CredentialError("Missing credentials.") + if sap_credentials is None: logger.warning( - "Your credentials will use DEV environment. If you would like to use different one - please specified it." + f"Your credentials will use {env} environment from local config. If you would like to use different one - please specified it in sap_credentials parameter." ) + try: + sap_credentials = local_config.get(sap_credentials_key).get(env) + except AttributeError: + raise CredentialError( + f"sap_credentials_key: {sap_credentials_key} is not stored neither in KeyVault or Local Config!" + ) + if sap_credentials is None: + raise CredentialError(f"Missing {env} credentials!") - super().__init__(*args, credentials=credentials, **kwargs) + super().__init__( + *args, + **kwargs, + ) + self.sap_credentials = sap_credentials self.sep = sep self.client_side_filters = None self.func = func @@ -278,7 +303,7 @@ def __init__( def con(self) -> pyrfc.Connection: if self._con is not None: return self._con - con = pyrfc.Connection(**self.credentials) + con = pyrfc.Connection(**self.sap_credentials) self._con = con return con @@ -455,9 +480,11 @@ def _get_columns(self, sql: str, aliased: bool = False) -> List[str]: self.aliases_keyed_by_columns = aliases_keyed_by_columns columns = [ - aliases_keyed_by_columns[col] - if col in aliases_keyed_by_columns - else col + ( + aliases_keyed_by_columns[col] + if col in aliases_keyed_by_columns + else col + ) for col in columns ] @@ -677,6 +704,9 @@ def __init__( func: str = "RFC_READ_TABLE", rfc_total_col_width_character_limit: int = 400, rfc_unique_id: List[str] = None, + sap_credentials: dict = None, + sap_credentials_key: str = "SAP", + env: str = "DEV", *args, **kwargs, ): @@ -693,25 +723,44 @@ def __init__( 512 characters. However, we observed SAP raising an exception even on a slightly lower number of characters, so we add a safety margin. Defaults to 400. rfc_unique_id (List[str], optional): Reference columns to merge chunks Data Frames. These columns must to be unique. Defaults to None. + sap_credentials (dict, optional): The credentials to use to authenticate with SAP. Defaults to None. + sap_credentials_key (str, optional): The key for sap credentials located in the local config or Azure Key Vault. Defaults to "SAP". + env (str, optional): The key for sap_credentials_key pointing to the SAP environment. Defaults to "DEV". Raises: CredentialError: If provided credentials are incorrect. """ self._con = None - DEFAULT_CREDENTIALS = local_config.get("SAP").get("DEV") + self.sap_credentials_key = sap_credentials_key + self.env = env + + if isinstance(sap_credentials, dict): + credentials_keys = list(sap_credentials.keys()) + required_credentials_params = ["sysnr", "user", "passwd", "ashost"] + for key in required_credentials_params: + if key not in credentials_keys: + logger.warning( + f"Required key '{key}' not found in your 'sap_credentials' dictionary!" + ) + sap_credentials = None - credentials = kwargs.pop("credentials", None) - if credentials is None: - credentials = DEFAULT_CREDENTIALS - if credentials is None: - raise CredentialError("Missing credentials.") + if sap_credentials is None: logger.warning( - "Your credentials will use DEV environment. If you would like to use different one - please specified it in 'sap_credentials' variable inside the flow." + f"Your credentials will use {env} environment from local config. If you would like to use different one - please specified it in sap_credentials parameter." ) + try: + sap_credentials = local_config.get(sap_credentials_key).get(env) + except AttributeError: + raise CredentialError( + f"sap_credentials_key: {sap_credentials_key} is not stored neither in KeyVault or Local Config!" + ) + if sap_credentials is None: + raise CredentialError(f"Missing {env} credentials!") - super().__init__(*args, credentials=credentials, **kwargs) + super().__init__(*args, **kwargs) + self.sap_credentials = sap_credentials self.sep = sep self.replacement = replacement self.client_side_filters = None @@ -904,9 +953,11 @@ def _get_columns(self, sql: str, aliased: bool = False) -> List[str]: self.aliases_keyed_by_columns = aliases_keyed_by_columns columns = [ - aliases_keyed_by_columns[col] - if col in aliases_keyed_by_columns - else col + ( + aliases_keyed_by_columns[col] + if col in aliases_keyed_by_columns + else col + ) for col in columns ] diff --git a/viadot/tasks/sap_rfc.py b/viadot/tasks/sap_rfc.py index 19d55353d..2d47e573a 100644 --- a/viadot/tasks/sap_rfc.py +++ b/viadot/tasks/sap_rfc.py @@ -2,13 +2,18 @@ from typing import List import pandas as pd +import json from prefect import Task from prefect.utilities.tasks import defaults_from_attrs +from viadot.tasks import AzureKeyVaultSecret try: from viadot.sources import SAPRFC, SAPRFCV2 except ImportError: raise +from prefect.utilities import logging + +logger = logging.get_logger() class SAPRFCToDF(Task): @@ -19,7 +24,9 @@ def __init__( replacement: str = "-", func: str = None, rfc_total_col_width_character_limit: int = 400, - credentials: dict = None, + sap_credentials: dict = None, + sap_credentials_key: str = "SAP", + env: str = "DEV", max_retries: int = 3, retry_delay: timedelta = timedelta(seconds=10), timeout: int = 3600, @@ -51,13 +58,17 @@ def __init__( in case of too many columns for RFC function. According to SAP documentation, the limit is 512 characters. However, we observed SAP raising an exception even on a slightly lower number of characters, so we add a safety margin. Defaults to 400. - credentials (dict, optional): The credentials to use to authenticate with SAP. + sap_credentials (dict, optional): The credentials to use to authenticate with SAP. By default, they're taken from the local viadot config. + sap_credentials_key (str, optional): The key for sap credentials located in the local config or Azure Key Vault. Defaults to "SAP". + env (str, optional): The key for sap_credentials_key pointing to the SAP environment. Defaults to "DEV". By default, they're taken from the local viadot config. """ self.query = query self.sep = sep self.replacement = replacement - self.credentials = credentials + self.sap_credentials = sap_credentials + self.sap_credentials_key = sap_credentials_key + self.env = env self.func = func self.rfc_total_col_width_character_limit = rfc_total_col_width_character_limit @@ -76,14 +87,16 @@ def __init__( "replacement", "func", "rfc_total_col_width_character_limit", - "credentials", + "sap_credentials", ) def run( self, - query: str = None, + query: str, sep: str = None, replacement: str = "-", - credentials: dict = None, + sap_credentials: dict = None, + sap_credentials_key: str = "SAP", + env: str = "DEV", func: str = None, rfc_total_col_width_character_limit: int = None, rfc_unique_id: List[str] = None, @@ -97,6 +110,9 @@ def run( multiple options are automatically tried. Defaults to None. replacement (str, optional): In case of sep is on a columns, set up a new character to replace inside the string to avoid flow breakdowns. Defaults to "-". + sap_credentials (dict, optional): The credentials to use to authenticate with SAP. Defaults to None. + sap_credentials_key (str, optional): The key for sap credentials located in the local config or Azure Key Vault. Defaults to "SAP". + env (str, optional): The key for sap_credentials_key pointing to the SAP environment. Defaults to "DEV". func (str, optional): SAP RFC function to use. Defaults to None. rfc_total_col_width_character_limit (int, optional): Number of characters by which query will be split in chunks in case of too many columns for RFC function. According to SAP documentation, the limit is @@ -116,8 +132,17 @@ def run( Returns: pd.DataFrame: DataFrame with SAP data. """ - if query is None: - raise ValueError("Please provide the query.") + + if sap_credentials is None: + try: + credentials_str = AzureKeyVaultSecret( + secret=sap_credentials_key, + ).run() + sap_credentials = json.loads(credentials_str).get(env) + except: + logger.warning( + f"Getting credentials from Azure Key Vault was not possible. Either there is no key: {sap_credentials_key} or env: {env} or there is not Key Vault in your environment." + ) if alternative_version is True: if rfc_unique_id: @@ -127,7 +152,9 @@ def run( sap = SAPRFCV2( sep=sep, replacement=replacement, - credentials=credentials, + sap_credentials=sap_credentials, + sap_credentials_key=sap_credentials_key, + env=env, func=func, rfc_total_col_width_character_limit=rfc_total_col_width_character_limit, rfc_unique_id=rfc_unique_id, @@ -135,7 +162,9 @@ def run( else: sap = SAPRFC( sep=sep, - credentials=credentials, + sap_credentials=sap_credentials, + sap_credentials_key=sap_credentials_key, + env=env, func=func, rfc_total_col_width_character_limit=rfc_total_col_width_character_limit, )