From 9cead23459a5918232cd4562f4c51952968d24bc Mon Sep 17 00:00:00 2001 From: gwieloch Date: Wed, 25 Oct 2023 13:44:13 +0200 Subject: [PATCH] added validate_df task to VidClubToADLS flow --- .../integration/flows/test_vidclub_to_adls.py | 61 +++++++++++++++++++ viadot/flows/vid_club_to_adls.py | 13 ++++ 2 files changed, 74 insertions(+) diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py index a0c86c2ec..c18eaad10 100644 --- a/tests/integration/flows/test_vidclub_to_adls.py +++ b/tests/integration/flows/test_vidclub_to_adls.py @@ -5,9 +5,11 @@ import pytest from viadot.flows import VidClubToADLS +from viadot.exceptions import ValidationError DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} ADLS_FILE_NAME = "test_vid_club.parquet" +ADLS_FILE_NAME2 = "test_vid_club_validate_df.parquet" ADLS_DIR_PATH = "raw/test/" @@ -29,3 +31,62 @@ def test_vidclub_to_adls_run_flow(mocked_class): assert result.is_successful() os.remove("test_vidclub_to_adls_flow_run.parquet") os.remove("test_vidclub_to_adls_flow_run.json") + + +def test_vidclub_validate_df_task_success(caplog): + flow = VidClubToADLS( + "test_vidclub_validate_df_task_success", + source="product", + cols_to_drop=[ + "submissionProductID", + "submissionProductDate", + "brand", + "productCode", + ], + from_date="2023-10-25", + to_date="2023-10-25", + adls_dir_path="raw/tests", + adls_file_name="test.parquet", + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + overwrite_adls=True, + validate_df_dict={ + "column_size": {"submissionID": 5}, + "column_list_to_match": [ + "submissionID", + "regionID", + "productQuantity", + "unit", + ], + }, + ) + + result = flow.run() + assert result.is_successful() + + +def test_vidclub_validate_df_task_fail(caplog): + flow = VidClubToADLS( + "test_vidclub_validate_df_task_fail", + source="product", + cols_to_drop=[ + "submissionProductID", + "submissionProductDate", + "brand", + "productCode", + ], + from_date="2023-10-25", + to_date="2023-10-25", + adls_dir_path="raw/tests", + adls_file_name="test.parquet", + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + overwrite_adls=True, + validate_df_dict={ + "column_size": {"submissionID": 5}, + "column_unique_values": ["regionID"], + }, + ) + + try: + flow.run() + except ValidationError: + pass diff --git a/viadot/flows/vid_club_to_adls.py b/viadot/flows/vid_club_to_adls.py index 59d676c51..df9c0d531 100644 --- a/viadot/flows/vid_club_to_adls.py +++ b/viadot/flows/vid_club_to_adls.py @@ -16,6 +16,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, VidClubToDF @@ -44,6 +45,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any] @@ -75,6 +77,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ # VidClubToDF @@ -89,6 +93,9 @@ def __init__( self.vidclub_credentials_secret = vidclub_credentials_secret self.vidclub_vault_name = vidclub_vault_name + # validate_df + self.validate_df_dict = validate_df_dict + # AzureDataLakeUpload self.adls_file_name = adls_file_name self.adls_dir_path = adls_dir_path @@ -147,6 +154,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + vid_club_df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(vid_club_df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(vid_club_df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)