Skip to content

Commit

Permalink
updated validation_task for aselite
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikjedlinski committed Oct 25, 2023
1 parent 8f5348a commit c9cc200
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 12 deletions.
11 changes: 2 additions & 9 deletions tests/integration/flows/test_aselite_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_aselite_to_adls():
os.remove(TMP_FILE_NAME)


def test_aselite_to_adls_validate_df():
def test_aselite_to_adls_validate_success():
credentials_secret = PrefectSecret("aselite").run()
vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run()

Expand All @@ -90,14 +90,7 @@ def test_aselite_to_adls_validate_df():
validate_df_dict = {
"column_size": {"ParentLanguageNo": 1},
"column_unique_values": ["ID"],
"column_list_to_match": [
"SpracheText",
"SpracheMM",
"KatSprache",
"KatBasisSprache",
"CodePage",
],
"dataset_row_count": {"min": 10, "max": 10},
"dataset_row_count": {"min": 0, "max": 10},
"column_match_regex": {"SpracheText", r"TE_.*"},
}

Expand Down
7 changes: 4 additions & 3 deletions viadot/flows/aselite_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(
to_path: str = None,
if_exists: Literal["replace", "append", "delete"] = "replace",
overwrite: bool = True,
validate_df_dict: dict = None,
validate_df_dict: Dict[str, Any] = None,
convert_bytes: bool = False,
sp_credentials_secret: str = None,
remove_special_characters: bool = None,
Expand All @@ -47,7 +47,8 @@ def __init__(
to_path (str): The path to an ADLS file. Defaults to None.
if_exists (Literal, optional): What to do if the table exists. Defaults to "replace".
overwrite (str, optional): Whether to overwrite the destination file. Defaults to True.
validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None.
validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
remove_special_characters (str, optional): Call a function that remove special characters like escape symbols. Defaults to None.
Expand Down Expand Up @@ -95,6 +96,7 @@ def gen_flow(self) -> Flow:
validation_task = validate_df.bind(
df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(df, flow=self)

create_csv = df_to_csv.bind(
df,
Expand All @@ -113,6 +115,5 @@ def gen_flow(self) -> Flow:
flow=self,
)

# validation_task.set_upstream(df, flow=self)
create_csv.set_upstream(df, flow=self)
adls_upload.set_upstream(create_csv, flow=self)

0 comments on commit c9cc200

Please sign in to comment.