diff --git a/tests/integration/flows/test_sap_bw_to_adls.py b/tests/integration/flows/test_sap_bw_to_adls.py index c337336de..2c01049e8 100644 --- a/tests/integration/flows/test_sap_bw_to_adls.py +++ b/tests/integration/flows/test_sap_bw_to_adls.py @@ -5,6 +5,7 @@ import pytest from viadot.flows import SAPBWToADLS +from viadot.exceptions import ValidationError DATA = { "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], @@ -51,3 +52,74 @@ def test_sap_bw_to_adls_flow_run(mocked_class): assert result.is_successful() os.remove("test_sap_bw_to_adls_flow_run.parquet") os.remove("test_sap_bw_to_adls_flow_run.json") + + +def test_sap_bw_validate_df_task_success(caplog): + flow = SAPBWToADLS( + "test_sap_bw_validate_df_task_success", + mdx_query=""" + SELECT + { [Measures].[003YPR4ERQZRMSMFEQ8123HRR] } ON COLUMNS, + NON EMPTY + { + { [0CALDAY].[20230102] } * + { + [0COMP_CODE].[1120] + } + } + ON ROWS + FROM ZCSALBIL1/ZBW4_ZCSALBIL1_002_BOA + """, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={ + "column_size": {"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]": 10}, + "column_list_to_match": [ + "[0CALDAY].[LEVEL01].[MEMBER_CAPTION]", + "[0COMP_CODE].[LEVEL01].[MEMBER_CAPTION]", + "[Measures].[003YPR4ERQZRMSMFEQ8123HRR]", + ], + }, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_sap_bw_validate_df_task_success.parquet") + os.remove("test_sap_bw_validate_df_task_success.json") + + +def test_sap_bw_validate_df_task_fail(caplog): + flow = SAPBWToADLS( + "test_sap_bw_validate_df_task_fail", + mdx_query=""" + SELECT + { [Measures].[003YPR4ERQZRMSMFEQ8123HRR] } ON COLUMNS, + NON EMPTY + { + { [0CALDAY].[20230102] } * + { + [0COMP_CODE].[1120] + } + } + ON ROWS + FROM ZCSALBIL1/ZBW4_ZCSALBIL1_002_BOA + """, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={ + "column_size": {"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]": 8}, + "column_list_to_match": [ + "[0CALDAY].[LEVEL01].[MEMBER_CAPTION]", + "[0COMP_CODE].[LEVEL01].[MEMBER_CAPTION]", + ], + }, + ) + + try: + flow.run() + except ValidationError: + pass + + os.remove("test_sap_bw_validate_df_task_fail.parquet") + os.remove("test_sap_bw_validate_df_task_fail.json") diff --git a/viadot/flows/sap_bw_to_adls.py b/viadot/flows/sap_bw_to_adls.py index 375f30775..9619df98e 100644 --- a/viadot/flows/sap_bw_to_adls.py +++ b/viadot/flows/sap_bw_to_adls.py @@ -14,6 +14,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, SAPBWToDF @@ -35,6 +36,7 @@ def __init__( overwrite_adls: bool = True, vault_name: str = None, sp_credentials_secret: str = None, + validate_df_dict: dict = None, *args: List[any], **kwargs: Dict[str, Any], ): @@ -56,6 +58,7 @@ def __init__( overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True. vault_name (str, optional): The name of the vault from which to obtain the secrets.. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. """ self.sapbw_credentials = sapbw_credentials self.sapbw_credentials_key = sapbw_credentials_key @@ -89,6 +92,7 @@ def __init__( self.overwrite_adls = overwrite_adls self.vault_name = vault_name self.sp_credentials_secret = sp_credentials_secret + self.validate_df_dict = validate_df_dict super().__init__(*args, name=name, **kwargs) self.gen_flow() @@ -110,6 +114,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self) diff --git a/viadot/sources/sap_bw.py b/viadot/sources/sap_bw.py index 73b1d2efa..8f4fb0583 100644 --- a/viadot/sources/sap_bw.py +++ b/viadot/sources/sap_bw.py @@ -25,6 +25,7 @@ def __init__(self, credentials: dict, *args, **kwargs): Raises: CredentialError: If provided credentials are incorrect. """ + self.credentials = credentials if credentials is None: raise CredentialError("Missing credentials.") diff --git a/viadot/tasks/sap_bw.py b/viadot/tasks/sap_bw.py index 4d34f9960..acc92c246 100644 --- a/viadot/tasks/sap_bw.py +++ b/viadot/tasks/sap_bw.py @@ -1,6 +1,7 @@ import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret +from viadot.tasks import AzureKeyVaultSecret from prefect.utilities import logging from viadot.exceptions import ValidationError @@ -27,13 +28,14 @@ def __init__( sapbw_credentials_key (str, optional): Azure KV secret. Defaults to "SAP". env (str, optional): SAP environment. Defaults to "BW". """ - if sapbw_credentials is None: - self.sapbw_credentials = credentials_loader.run( - credentials_secret=sapbw_credentials_key - ).get(env) + if not sapbw_credentials: + credentials_str = AzureKeyVaultSecret( + sapbw_credentials_key, + ).run() + self.sapbw_credentials = json.loads(credentials_str).get(env) else: - self.sapbw_credentials = sapbw_credentials + self.sapbw_credentials = PrefectSecret("SAP_BW").run() super().__init__( name="sapbw_to_df",