Skip to content

Commit

Permalink
Merge pull request #781 from gwieloch/sapbw_validate_df
Browse files Browse the repository at this point in the history
added `validate_df` task to `SAPBWToADLS` class
  • Loading branch information
m-paz authored Oct 26, 2023
2 parents 15fabd7 + 46db7a8 commit a293f0f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
72 changes: 72 additions & 0 deletions tests/integration/flows/test_sap_bw_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from viadot.flows import SAPBWToADLS
from viadot.exceptions import ValidationError

DATA = {
"[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"],
Expand Down Expand Up @@ -51,3 +52,74 @@ def test_sap_bw_to_adls_flow_run(mocked_class):
assert result.is_successful()
os.remove("test_sap_bw_to_adls_flow_run.parquet")
os.remove("test_sap_bw_to_adls_flow_run.json")


def test_sap_bw_validate_df_task_success(caplog):
flow = SAPBWToADLS(
"test_sap_bw_validate_df_task_success",
mdx_query="""
SELECT
{ [Measures].[003YPR4ERQZRMSMFEQ8123HRR] } ON COLUMNS,
NON EMPTY
{
{ [0CALDAY].[20230102] } *
{
[0COMP_CODE].[1120]
}
}
ON ROWS
FROM ZCSALBIL1/ZBW4_ZCSALBIL1_002_BOA
""",
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
validate_df_dict={
"column_size": {"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]": 10},
"column_list_to_match": [
"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]",
"[0COMP_CODE].[LEVEL01].[MEMBER_CAPTION]",
"[Measures].[003YPR4ERQZRMSMFEQ8123HRR]",
],
},
)
result = flow.run()
assert result.is_successful()
os.remove("test_sap_bw_validate_df_task_success.parquet")
os.remove("test_sap_bw_validate_df_task_success.json")


def test_sap_bw_validate_df_task_fail(caplog):
flow = SAPBWToADLS(
"test_sap_bw_validate_df_task_fail",
mdx_query="""
SELECT
{ [Measures].[003YPR4ERQZRMSMFEQ8123HRR] } ON COLUMNS,
NON EMPTY
{
{ [0CALDAY].[20230102] } *
{
[0COMP_CODE].[1120]
}
}
ON ROWS
FROM ZCSALBIL1/ZBW4_ZCSALBIL1_002_BOA
""",
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
validate_df_dict={
"column_size": {"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]": 8},
"column_list_to_match": [
"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]",
"[0COMP_CODE].[LEVEL01].[MEMBER_CAPTION]",
],
},
)

try:
flow.run()
except ValidationError:
pass

os.remove("test_sap_bw_validate_df_task_fail.parquet")
os.remove("test_sap_bw_validate_df_task_fail.json")
10 changes: 10 additions & 0 deletions viadot/flows/sap_bw_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
df_to_parquet,
dtypes_to_json_task,
update_dtypes_dict,
validate_df,
)
from viadot.tasks import AzureDataLakeUpload, SAPBWToDF

Expand All @@ -35,6 +36,7 @@ def __init__(
overwrite_adls: bool = True,
vault_name: str = None,
sp_credentials_secret: str = None,
validate_df_dict: dict = None,
*args: List[any],
**kwargs: Dict[str, Any],
):
Expand All @@ -56,6 +58,7 @@ def __init__(
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
vault_name (str, optional): The name of the vault from which to obtain the secrets.. Defaults to None.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None.
"""
self.sapbw_credentials = sapbw_credentials
self.sapbw_credentials_key = sapbw_credentials_key
Expand Down Expand Up @@ -89,6 +92,7 @@ def __init__(
self.overwrite_adls = overwrite_adls
self.vault_name = vault_name
self.sp_credentials_secret = sp_credentials_secret
self.validate_df_dict = validate_df_dict

super().__init__(*args, name=name, **kwargs)
self.gen_flow()
Expand All @@ -110,6 +114,12 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
validation_task = validate_df.bind(
df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(df, flow=self)

df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self)

Expand Down
1 change: 1 addition & 0 deletions viadot/sources/sap_bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, credentials: dict, *args, **kwargs):
Raises:
CredentialError: If provided credentials are incorrect.
"""
self.credentials = credentials
if credentials is None:
raise CredentialError("Missing credentials.")

Expand Down
12 changes: 7 additions & 5 deletions viadot/tasks/sap_bw.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pandas as pd
from prefect import Task
from prefect.tasks.secrets import PrefectSecret
from viadot.tasks import AzureKeyVaultSecret
from prefect.utilities import logging

from viadot.exceptions import ValidationError
Expand All @@ -27,13 +28,14 @@ def __init__(
sapbw_credentials_key (str, optional): Azure KV secret. Defaults to "SAP".
env (str, optional): SAP environment. Defaults to "BW".
"""
if sapbw_credentials is None:
self.sapbw_credentials = credentials_loader.run(
credentials_secret=sapbw_credentials_key
).get(env)
if not sapbw_credentials:
credentials_str = AzureKeyVaultSecret(
sapbw_credentials_key,
).run()
self.sapbw_credentials = json.loads(credentials_str).get(env)

else:
self.sapbw_credentials = sapbw_credentials
self.sapbw_credentials = PrefectSecret("SAP_BW").run()

super().__init__(
name="sapbw_to_df",
Expand Down

0 comments on commit a293f0f

Please sign in to comment.