Skip to content

Commit

Permalink
added order for validation_task for vaidot flows
Browse files Browse the repository at this point in the history
  • Loading branch information
gwieloch committed Oct 26, 2023
1 parent ef1900e commit 4338f55
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 6 deletions.
3 changes: 3 additions & 0 deletions viadot/flows/aselite_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,8 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
create_csv.set_upstream(validation_task, flow=self)

create_csv.set_upstream(df, flow=self)
adls_upload.set_upstream(create_csv, flow=self)
3 changes: 3 additions & 0 deletions viadot/flows/bigquery_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

df_with_metadata.set_upstream(df, flow=self)
dtypes_dict.set_upstream(df_with_metadata, flow=self)
df_to_be_loaded.set_upstream(dtypes_dict, flow=self)
Expand Down
7 changes: 5 additions & 2 deletions viadot/flows/cloud_for_customers_report_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ def gen_flow(self) -> Flow:
)

if self.validate_df_dict:
validation = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation.set_upstream(df, flow=self)
validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation_task.set_upstream(df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)

Expand Down Expand Up @@ -232,6 +232,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

df_with_metadata.set_upstream(df, flow=self)
df_to_file.set_upstream(df_with_metadata, flow=self)
file_to_adls_task.set_upstream(df_to_file, flow=self)
3 changes: 3 additions & 0 deletions viadot/flows/customer_gauge_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)
3 changes: 3 additions & 0 deletions viadot/flows/eurostat_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)
3 changes: 3 additions & 0 deletions viadot/flows/hubspot_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_viadot_downloaded.set_upstream(validation_task, flow=self)

df_viadot_downloaded.set_upstream(df, flow=self)
dtypes_dict.set_upstream(df_viadot_downloaded, flow=self)
df_to_be_loaded.set_upstream(dtypes_dict, flow=self)
Expand Down
3 changes: 3 additions & 0 deletions viadot/flows/mediatool_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)
3 changes: 3 additions & 0 deletions viadot/flows/mysql_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,8 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
create_csv.set_upstream(validation_task, flow=self)

create_csv.set_upstream(df, flow=self)
adls_upload.set_upstream(create_csv, flow=self)
3 changes: 3 additions & 0 deletions viadot/flows/outlook_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

df_with_metadata.set_upstream(df, flow=self)
df_to_file.set_upstream(df_with_metadata, flow=self)
file_to_adls_task.set_upstream(df_to_file, flow=self)
3 changes: 3 additions & 0 deletions viadot/flows/salesforce_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_clean.set_upstream(validation_task, flow=self)

df_clean.set_upstream(df, flow=self)
df_with_metadata.set_upstream(df_clean, flow=self)
dtypes_dict.set_upstream(df_with_metadata, flow=self)
Expand Down
3 changes: 3 additions & 0 deletions viadot/flows/sap_bw_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_viadot_downloaded.set_upstream(validation_task, flow=self)

df_viadot_downloaded.set_upstream(df, flow=self)
dtypes_dict.set_upstream(df_viadot_downloaded, flow=self)
df_to_be_loaded.set_upstream(dtypes_dict, flow=self)
Expand Down
3 changes: 3 additions & 0 deletions viadot/flows/sap_rfc_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_to_file.set_upstream(validation_task, flow=self)

df_to_file.set_upstream(df, flow=self)
adls_upload.set_upstream(df_to_file, flow=self)

Expand Down
14 changes: 10 additions & 4 deletions viadot/flows/sharepoint_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def gen_flow(self) -> Flow:
)

if self.validate_df_dict:
validation = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation.set_upstream(df, flow=self)
validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation_task.set_upstream(df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)
Expand Down Expand Up @@ -168,6 +168,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

df_mapped.set_upstream(df_with_metadata, flow=self)
dtypes_to_json_task.set_upstream(df_mapped, flow=self)
df_to_file.set_upstream(dtypes_to_json_task, flow=self)
Expand Down Expand Up @@ -316,8 +319,8 @@ def gen_flow(self) -> Flow:
df = s.run()

if self.validate_df_dict:
validation = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation.set_upstream(df, flow=self)
validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation_task.set_upstream(df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)
Expand Down Expand Up @@ -353,6 +356,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

df_mapped.set_upstream(df_with_metadata, flow=self)
dtypes_to_json_task.set_upstream(df_mapped, flow=self)
df_to_file.set_upstream(dtypes_to_json_task, flow=self)
Expand Down
3 changes: 3 additions & 0 deletions viadot/flows/supermetrics_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

write_json.set_upstream(df, flow=self)
validation.set_upstream(write_json, flow=self)
df_with_metadata.set_upstream(validation_upstream, flow=self)
Expand Down
3 changes: 3 additions & 0 deletions viadot/flows/vid_club_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ def gen_flow(self) -> Flow:
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)

file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)

0 comments on commit 4338f55

Please sign in to comment.