Skip to content

Commit

Permalink
✨ Added validate_df func into ADLSToAzureSQL flow
Browse files Browse the repository at this point in the history
  • Loading branch information
burzec-dyv committed Dec 15, 2023
1 parent 48c14c0 commit 9b7d1f7
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CheckColumnOrder,
DownloadGitHubFile,
)
from viadot.task_utils import validate_df
from viadot.tasks.azure_data_lake import AzureDataLakeDownload

logger = logging.get_logger(__name__)
Expand Down Expand Up @@ -150,6 +151,7 @@ def __init__(
tags: List[str] = ["promotion"],
vault_name: str = None,
timeout: int = 3600,
validate_df_dict: Dict[str, Any] = None,
*args: List[any],
**kwargs: Dict[str, Any],
):
Expand Down Expand Up @@ -186,6 +188,8 @@ def __init__(
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe.
If defined, triggers the `validate_df` task from task_utils. Defaults to None.
"""

adls_path = adls_path.strip("/")
Expand Down Expand Up @@ -236,6 +240,7 @@ def __init__(
self.tags = tags
self.vault_name = vault_name
self.timeout = timeout
self.validate_df_dict = validate_df_dict

super().__init__(*args, name=name, **kwargs)

Expand Down Expand Up @@ -356,6 +361,11 @@ def gen_flow(self) -> Flow:
flow=self,
)

# data validation function (optional)
if self.validate_df_dict:
validate_df.bind(df=df, tests=self.validate_df_dict, flow=self)
validate_df.set_upstream(lake_to_df_task, flow=self)

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
Expand Down

0 comments on commit 9b7d1f7

Please sign in to comment.