Skip to content

Commit

Permalink
added validate_df task to VidClubToADLS flow
Browse files Browse the repository at this point in the history
  • Loading branch information
gwieloch committed Oct 25, 2023
1 parent 318a8fa commit 9cead23
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
61 changes: 61 additions & 0 deletions tests/integration/flows/test_vidclub_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import pytest

from viadot.flows import VidClubToADLS
from viadot.exceptions import ValidationError

DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]}
ADLS_FILE_NAME = "test_vid_club.parquet"
ADLS_FILE_NAME2 = "test_vid_club_validate_df.parquet"
ADLS_DIR_PATH = "raw/test/"


Expand All @@ -29,3 +31,62 @@ def test_vidclub_to_adls_run_flow(mocked_class):
assert result.is_successful()
os.remove("test_vidclub_to_adls_flow_run.parquet")
os.remove("test_vidclub_to_adls_flow_run.json")


def test_vidclub_validate_df_task_success(caplog):
flow = VidClubToADLS(
"test_vidclub_validate_df_task_success",
source="product",
cols_to_drop=[
"submissionProductID",
"submissionProductDate",
"brand",
"productCode",
],
from_date="2023-10-25",
to_date="2023-10-25",
adls_dir_path="raw/tests",
adls_file_name="test.parquet",
adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA",
overwrite_adls=True,
validate_df_dict={
"column_size": {"submissionID": 5},
"column_list_to_match": [
"submissionID",
"regionID",
"productQuantity",
"unit",
],
},
)

result = flow.run()
assert result.is_successful()


def test_vidclub_validate_df_task_fail(caplog):
flow = VidClubToADLS(
"test_vidclub_validate_df_task_fail",
source="product",
cols_to_drop=[
"submissionProductID",
"submissionProductDate",
"brand",
"productCode",
],
from_date="2023-10-25",
to_date="2023-10-25",
adls_dir_path="raw/tests",
adls_file_name="test.parquet",
adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA",
overwrite_adls=True,
validate_df_dict={
"column_size": {"submissionID": 5},
"column_unique_values": ["regionID"],
},
)

try:
flow.run()
except ValidationError:
pass
13 changes: 13 additions & 0 deletions viadot/flows/vid_club_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
df_to_parquet,
dtypes_to_json_task,
update_dtypes_dict,
validate_df,
)
from viadot.tasks import AzureDataLakeUpload, VidClubToDF

Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(
adls_sp_credentials_secret: str = None,
overwrite_adls: bool = False,
if_exists: str = "replace",
validate_df_dict: dict = None,
timeout: int = 3600,
*args: List[Any],
**kwargs: Dict[str, Any]
Expand Down Expand Up @@ -75,6 +77,8 @@ def __init__(
Defaults to None.
overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False.
if_exists (str, optional): What to do if the file exists. Defaults to "replace".
validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None.
timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.
"""
# VidClubToDF
Expand All @@ -89,6 +93,9 @@ def __init__(
self.vidclub_credentials_secret = vidclub_credentials_secret
self.vidclub_vault_name = vidclub_vault_name

# validate_df
self.validate_df_dict = validate_df_dict

# AzureDataLakeUpload
self.adls_file_name = adls_file_name
self.adls_dir_path = adls_dir_path
Expand Down Expand Up @@ -147,6 +154,12 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
validation_task = validate_df.bind(
vid_club_df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(vid_club_df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(vid_club_df, flow=self)

dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)
Expand Down

0 comments on commit 9cead23

Please sign in to comment.