From d0f33a9953125f6fe95d94240390b52bcb1bcb44 Mon Sep 17 00:00:00 2001 From: burzekj Date: Tue, 12 Dec 2023 14:08:24 +0100 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=90=9B=20fixed=20github=20action=20bu?= =?UTF-8?q?g=20related=20to=20black=20formatter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/build.yml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 006f22e39..865087f6a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -64,18 +64,6 @@ jobs: run: | pip install black black --check . - continue-on-error: true - - - name: Commit Black changes to the pull request - if: ${{ always() && steps.blackCheck.outcome == 'failure' }} - run: | - git config --global user.name 'github-actions[bot]' - git config --global user.email 'github-actions[bot]@users.noreply.github.com' - git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/$GITHUB_REPOSITORY - black . - git checkout $GITHUB_HEAD_REF - git commit -am "🎨 Format Python code with Black" - git push - name: Test with pytest if: always() From 2883a94cc153e88fa5bce4b9b5a207e99a3bb24b Mon Sep 17 00:00:00 2001 From: burzekj Date: Tue, 12 Dec 2023 18:03:53 +0100 Subject: [PATCH 2/3] =?UTF-8?q?=E2=9C=A8=20Extanded=20logic=20for=20if=5Fe?= =?UTF-8?q?mpty=20param=20in=20ADLSToAzureSQL=20flow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + viadot/flows/adls_to_azure_sql.py | 166 ++++++++++++++++-------------- 2 files changed, 87 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a30ad98b8..80c1882e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Added logic for if_empty param: `check_if_df_empty` task to `ADLSToAzureSQL` flow ### Fixed diff --git a/viadot/flows/adls_to_azure_sql.py b/viadot/flows/adls_to_azure_sql.py index a9e49c6b6..0951efaa6 100644 --- a/viadot/flows/adls_to_azure_sql.py +++ b/viadot/flows/adls_to_azure_sql.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal import pandas as pd -from prefect import Flow, task +from prefect import Flow, task, case from prefect.backend import get_key_value from prefect.engine import signals from prefect.utilities import logging @@ -18,6 +18,7 @@ DownloadGitHubFile, ) from viadot.tasks.azure_data_lake import AzureDataLakeDownload +from viadot.task_utils import check_if_df_empty logger = logging.get_logger(__name__) @@ -136,7 +137,7 @@ def __init__( write_sep: str = "\t", remove_tab: bool = False, overwrite_adls: bool = True, - if_empty: str = "warn", + if_empty: Literal["fail", "warn", "skip"] = "skip", adls_sp_credentials_secret: str = None, dtypes: Dict[str, Any] = None, check_dtypes_order: bool = True, @@ -166,7 +167,7 @@ def __init__( write_sep (str, optional): The delimiter for the output CSV file. Defaults to "\t". remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False. overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True. - if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn". + if_empty (str, optional): What to do if the loaded DataFrame is empty. Defaults to "skip". adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. @@ -274,91 +275,96 @@ def gen_flow(self) -> Flow: flow=self, ) - if not self.dtypes: - download_json_file_task = AzureDataLakeDownload(timeout=self.timeout) - download_json_file_task.bind( - from_path=self.json_shema_path, - to_path=self.local_json_path, + df_empty = check_if_df_empty.bind(df, self.if_empty, flow=self) + + with case(df_empty, False): + if not self.dtypes: + download_json_file_task = AzureDataLakeDownload(timeout=self.timeout) + download_json_file_task.bind( + from_path=self.json_shema_path, + to_path=self.local_json_path, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + dtypes = map_data_types_task.bind(self.local_json_path, flow=self) + map_data_types_task.set_upstream(download_json_file_task, flow=self) + else: + dtypes = check_dtypes_sort.bind( + df, + dtypes=self.dtypes, + apply=self.check_dtypes_order, + flow=self, + ) + + check_column_order_task = CheckColumnOrder(timeout=self.timeout) + df_reorder = check_column_order_task.bind( + table=self.table, + schema=self.schema, + df=df, + if_exists=self.if_exists, + credentials_secret=self.sqldb_credentials_secret, + flow=self, + ) + if self.check_col_order == False: + df_to_csv = df_to_csv_task.bind( + df=df, + path=self.local_file_path, + sep=self.write_sep, + remove_tab=self.remove_tab, + flow=self, + ) + else: + df_to_csv = df_to_csv_task.bind( + df=df_reorder, + path=self.local_file_path, + sep=self.write_sep, + remove_tab=self.remove_tab, + flow=self, + ) + + promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout) + promote_to_conformed_task.bind( + from_path=self.adls_path, + to_path=self.adls_path_conformed, sp_credentials_secret=self.adls_sp_credentials_secret, + vault_name=self.vault_name, flow=self, ) - dtypes = map_data_types_task.bind(self.local_json_path, flow=self) - map_data_types_task.set_upstream(download_json_file_task, flow=self) - else: - dtypes = check_dtypes_sort.bind( - df, - dtypes=self.dtypes, - apply=self.check_dtypes_order, + promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout) + promote_to_operations_task.bind( + from_path=self.adls_path_conformed, + to_path=self.adls_path_operations, + sp_credentials_secret=self.adls_sp_credentials_secret, + vault_name=self.vault_name, flow=self, ) - - check_column_order_task = CheckColumnOrder(timeout=self.timeout) - df_reorder = check_column_order_task.bind( - table=self.table, - schema=self.schema, - df=df, - if_exists=self.if_exists, - credentials_secret=self.sqldb_credentials_secret, - flow=self, - ) - if self.check_col_order == False: - df_to_csv = df_to_csv_task.bind( - df=df, - path=self.local_file_path, - sep=self.write_sep, - remove_tab=self.remove_tab, + create_table_task = AzureSQLCreateTable(timeout=self.timeout) + create_table_task.bind( + schema=self.schema, + table=self.table, + dtypes=dtypes, + if_exists=self._map_if_exists(self.if_exists), + credentials_secret=self.sqldb_credentials_secret, + vault_name=self.vault_name, flow=self, ) - else: - df_to_csv = df_to_csv_task.bind( - df=df_reorder, + bulk_insert_task = BCPTask(timeout=self.timeout) + bulk_insert_task.bind( path=self.local_file_path, - sep=self.write_sep, - remove_tab=self.remove_tab, + schema=self.schema, + table=self.table, + error_log_file_path=self.name.replace(" ", "_") + ".log", + on_error=self.on_bcp_error, + credentials_secret=self.sqldb_credentials_secret, + vault_name=self.vault_name, flow=self, ) - promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout) - promote_to_conformed_task.bind( - from_path=self.adls_path, - to_path=self.adls_path_conformed, - sp_credentials_secret=self.adls_sp_credentials_secret, - vault_name=self.vault_name, - flow=self, - ) - promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout) - promote_to_operations_task.bind( - from_path=self.adls_path_conformed, - to_path=self.adls_path_operations, - sp_credentials_secret=self.adls_sp_credentials_secret, - vault_name=self.vault_name, - flow=self, - ) - create_table_task = AzureSQLCreateTable(timeout=self.timeout) - create_table_task.bind( - schema=self.schema, - table=self.table, - dtypes=dtypes, - if_exists=self._map_if_exists(self.if_exists), - credentials_secret=self.sqldb_credentials_secret, - vault_name=self.vault_name, - flow=self, - ) - bulk_insert_task = BCPTask(timeout=self.timeout) - bulk_insert_task.bind( - path=self.local_file_path, - schema=self.schema, - table=self.table, - error_log_file_path=self.name.replace(" ", "_") + ".log", - on_error=self.on_bcp_error, - credentials_secret=self.sqldb_credentials_secret, - vault_name=self.vault_name, - flow=self, - ) - - df_reorder.set_upstream(lake_to_df_task, flow=self) - df_to_csv.set_upstream(df_reorder, flow=self) - promote_to_conformed_task.set_upstream(df_to_csv, flow=self) - create_table_task.set_upstream(df_to_csv, flow=self) - promote_to_operations_task.set_upstream(promote_to_conformed_task, flow=self) - bulk_insert_task.set_upstream(create_table_task, flow=self) + df_reorder.set_upstream(lake_to_df_task, flow=self) + df_to_csv.set_upstream(df_reorder, flow=self) + promote_to_conformed_task.set_upstream(df_to_csv, flow=self) + create_table_task.set_upstream(df_to_csv, flow=self) + promote_to_operations_task.set_upstream( + promote_to_conformed_task, flow=self + ) + bulk_insert_task.set_upstream(create_table_task, flow=self) From 20be35369a6e36c5b0f06a1e97e9faa5daae3eef Mon Sep 17 00:00:00 2001 From: Jakub Burzec <125436423+burzekj@users.noreply.github.com> Date: Wed, 13 Dec 2023 11:20:36 +0100 Subject: [PATCH 3/3] Update viadot/flows/adls_to_azure_sql.py Co-authored-by: Marcin Purtak <44641138+marcinpurtak@users.noreply.github.com> --- viadot/flows/adls_to_azure_sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/viadot/flows/adls_to_azure_sql.py b/viadot/flows/adls_to_azure_sql.py index 0951efaa6..95b1340ca 100644 --- a/viadot/flows/adls_to_azure_sql.py +++ b/viadot/flows/adls_to_azure_sql.py @@ -167,7 +167,7 @@ def __init__( write_sep (str, optional): The delimiter for the output CSV file. Defaults to "\t". remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False. overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True. - if_empty (str, optional): What to do if the loaded DataFrame is empty. Defaults to "skip". + if_empty (str, optional): What to do if the loaded DataFrame is empty. Defaults to "skip" which acts like ignore this setting. adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None.