diff --git a/viadot/flows/aselite_to_adls.py b/viadot/flows/aselite_to_adls.py index bd77cf40f..61a91c963 100644 --- a/viadot/flows/aselite_to_adls.py +++ b/viadot/flows/aselite_to_adls.py @@ -115,5 +115,8 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + create_csv.set_upstream(validation_task, flow=self) + create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self) diff --git a/viadot/flows/bigquery_to_adls.py b/viadot/flows/bigquery_to_adls.py index 935b3f7a1..e09981ebe 100644 --- a/viadot/flows/bigquery_to_adls.py +++ b/viadot/flows/bigquery_to_adls.py @@ -197,6 +197,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_with_metadata.set_upstream(df, flow=self) dtypes_dict.set_upstream(df_with_metadata, flow=self) df_to_be_loaded.set_upstream(dtypes_dict, flow=self) diff --git a/viadot/flows/cloud_for_customers_report_to_adls.py b/viadot/flows/cloud_for_customers_report_to_adls.py index 11b3e92bf..386a7224e 100644 --- a/viadot/flows/cloud_for_customers_report_to_adls.py +++ b/viadot/flows/cloud_for_customers_report_to_adls.py @@ -203,8 +203,8 @@ def gen_flow(self) -> Flow: ) if self.validate_df_dict: - validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) - validation.set_upstream(df, flow=self) + validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation_task.set_upstream(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) @@ -232,6 +232,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_with_metadata.set_upstream(df, flow=self) df_to_file.set_upstream(df_with_metadata, flow=self) file_to_adls_task.set_upstream(df_to_file, flow=self) diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 6c54a0704..39225330c 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -241,6 +241,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/eurostat_to_adls.py b/viadot/flows/eurostat_to_adls.py index e6c76a084..08a8677d6 100644 --- a/viadot/flows/eurostat_to_adls.py +++ b/viadot/flows/eurostat_to_adls.py @@ -178,6 +178,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/hubspot_to_adls.py b/viadot/flows/hubspot_to_adls.py index 7a2fe4387..87f5e2504 100644 --- a/viadot/flows/hubspot_to_adls.py +++ b/viadot/flows/hubspot_to_adls.py @@ -183,6 +183,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_viadot_downloaded.set_upstream(validation_task, flow=self) + df_viadot_downloaded.set_upstream(df, flow=self) dtypes_dict.set_upstream(df_viadot_downloaded, flow=self) df_to_be_loaded.set_upstream(dtypes_dict, flow=self) diff --git a/viadot/flows/mediatool_to_adls.py b/viadot/flows/mediatool_to_adls.py index c4e92432b..156cfeef5 100644 --- a/viadot/flows/mediatool_to_adls.py +++ b/viadot/flows/mediatool_to_adls.py @@ -173,6 +173,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/mysql_to_adls.py b/viadot/flows/mysql_to_adls.py index afe594e47..c8c4c9c44 100644 --- a/viadot/flows/mysql_to_adls.py +++ b/viadot/flows/mysql_to_adls.py @@ -105,5 +105,8 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + create_csv.set_upstream(validation_task, flow=self) + create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self) diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index 606674ba1..dfeef1302 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -138,6 +138,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_with_metadata.set_upstream(df, flow=self) df_to_file.set_upstream(df_with_metadata, flow=self) file_to_adls_task.set_upstream(df_to_file, flow=self) diff --git a/viadot/flows/salesforce_to_adls.py b/viadot/flows/salesforce_to_adls.py index 1ace9aa5a..11dec54fd 100644 --- a/viadot/flows/salesforce_to_adls.py +++ b/viadot/flows/salesforce_to_adls.py @@ -189,6 +189,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_clean.set_upstream(validation_task, flow=self) + df_clean.set_upstream(df, flow=self) df_with_metadata.set_upstream(df_clean, flow=self) dtypes_dict.set_upstream(df_with_metadata, flow=self) diff --git a/viadot/flows/sap_bw_to_adls.py b/viadot/flows/sap_bw_to_adls.py index 9619df98e..90b965f92 100644 --- a/viadot/flows/sap_bw_to_adls.py +++ b/viadot/flows/sap_bw_to_adls.py @@ -166,6 +166,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_viadot_downloaded.set_upstream(validation_task, flow=self) + df_viadot_downloaded.set_upstream(df, flow=self) dtypes_dict.set_upstream(df_viadot_downloaded, flow=self) df_to_be_loaded.set_upstream(dtypes_dict, flow=self) diff --git a/viadot/flows/sap_rfc_to_adls.py b/viadot/flows/sap_rfc_to_adls.py index c2bc71d8d..d7a2ac390 100644 --- a/viadot/flows/sap_rfc_to_adls.py +++ b/viadot/flows/sap_rfc_to_adls.py @@ -154,6 +154,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_to_file.set_upstream(validation_task, flow=self) + df_to_file.set_upstream(df, flow=self) adls_upload.set_upstream(df_to_file, flow=self) diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index 4b8e477dd..eaf747bab 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -124,8 +124,8 @@ def gen_flow(self) -> Flow: ) if self.validate_df_dict: - validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) - validation.set_upstream(df, flow=self) + validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation_task.set_upstream(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) @@ -168,6 +168,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_mapped.set_upstream(df_with_metadata, flow=self) dtypes_to_json_task.set_upstream(df_mapped, flow=self) df_to_file.set_upstream(dtypes_to_json_task, flow=self) @@ -316,8 +319,8 @@ def gen_flow(self) -> Flow: df = s.run() if self.validate_df_dict: - validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) - validation.set_upstream(df, flow=self) + validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation_task.set_upstream(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) @@ -353,6 +356,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_mapped.set_upstream(df_with_metadata, flow=self) dtypes_to_json_task.set_upstream(df_mapped, flow=self) df_to_file.set_upstream(dtypes_to_json_task, flow=self) diff --git a/viadot/flows/supermetrics_to_adls.py b/viadot/flows/supermetrics_to_adls.py index 5a104cff7..80253eb88 100644 --- a/viadot/flows/supermetrics_to_adls.py +++ b/viadot/flows/supermetrics_to_adls.py @@ -315,6 +315,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + write_json.set_upstream(df, flow=self) validation.set_upstream(write_json, flow=self) df_with_metadata.set_upstream(validation_upstream, flow=self) diff --git a/viadot/flows/vid_club_to_adls.py b/viadot/flows/vid_club_to_adls.py index df9c0d531..40f53d8ae 100644 --- a/viadot/flows/vid_club_to_adls.py +++ b/viadot/flows/vid_club_to_adls.py @@ -204,6 +204,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path)