From c9cc2005a1202395cea04500fcdfc24231b5165c Mon Sep 17 00:00:00 2001 From: dominikjedlinski Date: Wed, 25 Oct 2023 14:39:23 +0200 Subject: [PATCH] updated validation_task for aselite --- tests/integration/flows/test_aselite_to_adls.py | 11 ++--------- viadot/flows/aselite_to_adls.py | 7 ++++--- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/integration/flows/test_aselite_to_adls.py b/tests/integration/flows/test_aselite_to_adls.py index 32a31aca7..60022f47e 100644 --- a/tests/integration/flows/test_aselite_to_adls.py +++ b/tests/integration/flows/test_aselite_to_adls.py @@ -64,7 +64,7 @@ def test_aselite_to_adls(): os.remove(TMP_FILE_NAME) -def test_aselite_to_adls_validate_df(): +def test_aselite_to_adls_validate_success(): credentials_secret = PrefectSecret("aselite").run() vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run() @@ -90,14 +90,7 @@ def test_aselite_to_adls_validate_df(): validate_df_dict = { "column_size": {"ParentLanguageNo": 1}, "column_unique_values": ["ID"], - "column_list_to_match": [ - "SpracheText", - "SpracheMM", - "KatSprache", - "KatBasisSprache", - "CodePage", - ], - "dataset_row_count": {"min": 10, "max": 10}, + "dataset_row_count": {"min": 0, "max": 10}, "column_match_regex": {"SpracheText", r"TE_.*"}, } diff --git a/viadot/flows/aselite_to_adls.py b/viadot/flows/aselite_to_adls.py index 8fcdaddb3..bd77cf40f 100644 --- a/viadot/flows/aselite_to_adls.py +++ b/viadot/flows/aselite_to_adls.py @@ -24,7 +24,7 @@ def __init__( to_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite: bool = True, - validate_df_dict: dict = None, + validate_df_dict: Dict[str, Any] = None, convert_bytes: bool = False, sp_credentials_secret: str = None, remove_special_characters: bool = None, @@ -47,7 +47,8 @@ def __init__( to_path (str): The path to an ADLS file. Defaults to None. if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite (str, optional): Whether to overwrite the destination file. Defaults to True. - validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. remove_special_characters (str, optional): Call a function that remove special characters like escape symbols. Defaults to None. @@ -95,6 +96,7 @@ def gen_flow(self) -> Flow: validation_task = validate_df.bind( df, tests=self.validate_df_dict, flow=self ) + validation_task.set_upstream(df, flow=self) create_csv = df_to_csv.bind( df, @@ -113,6 +115,5 @@ def gen_flow(self) -> Flow: flow=self, ) - # validation_task.set_upstream(df, flow=self) create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self)