Skip to content

Commit

Permalink
Merge branch 'dev' into validate_df_implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
burzekj authored Jan 5, 2024
2 parents 9b7d1f7 + b7907f6 commit d5904da
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 97 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,6 @@ jobs:
run: |
pip install black
black --check .
continue-on-error: true

- name: Commit Black changes to the pull request
if: ${{ always() && steps.blackCheck.outcome == 'failure' }}
run: |
git config --global user.name 'github-actions[bot]'
git config --global user.email 'github-actions[bot]@users.noreply.github.com'
git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/$GITHUB_REPOSITORY
black .
git checkout $GITHUB_HEAD_REF
git commit -am "🎨 Format Python code with Black"
git push
- name: Test with pytest
if: always()
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Added
- Added logic for if_empty param: `check_if_df_empty` task to `ADLSToAzureSQL` flow

### Fixed

Expand Down
176 changes: 91 additions & 85 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Dict, List, Literal

import pandas as pd
from prefect import Flow, task
from prefect import Flow, task, case
from prefect.backend import get_key_value
from prefect.engine import signals
from prefect.utilities import logging
Expand All @@ -19,6 +19,7 @@
)
from viadot.task_utils import validate_df
from viadot.tasks.azure_data_lake import AzureDataLakeDownload
from viadot.task_utils import check_if_df_empty

logger = logging.get_logger(__name__)

Expand Down Expand Up @@ -137,7 +138,7 @@ def __init__(
write_sep: str = "\t",
remove_tab: bool = False,
overwrite_adls: bool = True,
if_empty: str = "warn",
if_empty: Literal["fail", "warn", "skip"] = "skip",
adls_sp_credentials_secret: str = None,
dtypes: Dict[str, Any] = None,
check_dtypes_order: bool = True,
Expand Down Expand Up @@ -168,7 +169,7 @@ def __init__(
write_sep (str, optional): The delimiter for the output CSV file. Defaults to "\t".
remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False.
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
if_empty (str, optional): What to do if the loaded DataFrame is empty. Defaults to "skip" which acts like ignore this setting.
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
Expand Down Expand Up @@ -279,96 +280,101 @@ def gen_flow(self) -> Flow:
flow=self,
)

if not self.dtypes:
download_json_file_task = AzureDataLakeDownload(timeout=self.timeout)
download_json_file_task.bind(
from_path=self.json_shema_path,
to_path=self.local_json_path,
df_empty = check_if_df_empty.bind(df, self.if_empty, flow=self)

with case(df_empty, False):
if not self.dtypes:
download_json_file_task = AzureDataLakeDownload(timeout=self.timeout)
download_json_file_task.bind(
from_path=self.json_shema_path,
to_path=self.local_json_path,
sp_credentials_secret=self.adls_sp_credentials_secret,
flow=self,
)
dtypes = map_data_types_task.bind(self.local_json_path, flow=self)
map_data_types_task.set_upstream(download_json_file_task, flow=self)
else:
dtypes = check_dtypes_sort.bind(
df,
dtypes=self.dtypes,
apply=self.check_dtypes_order,
flow=self,
)

check_column_order_task = CheckColumnOrder(timeout=self.timeout)
df_reorder = check_column_order_task.bind(
table=self.table,
schema=self.schema,
df=df,
if_exists=self.if_exists,
credentials_secret=self.sqldb_credentials_secret,
flow=self,
)
if self.check_col_order == False:
df_to_csv = df_to_csv_task.bind(
df=df,
path=self.local_file_path,
sep=self.write_sep,
remove_tab=self.remove_tab,
flow=self,
)
else:
df_to_csv = df_to_csv_task.bind(
df=df_reorder,
path=self.local_file_path,
sep=self.write_sep,
remove_tab=self.remove_tab,
flow=self,
)

promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_conformed_task.bind(
from_path=self.adls_path,
to_path=self.adls_path_conformed,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
dtypes = map_data_types_task.bind(self.local_json_path, flow=self)
map_data_types_task.set_upstream(download_json_file_task, flow=self)
else:
dtypes = check_dtypes_sort.bind(
df,
dtypes=self.dtypes,
apply=self.check_dtypes_order,
promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_operations_task.bind(
from_path=self.adls_path_conformed,
to_path=self.adls_path_operations,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)

check_column_order_task = CheckColumnOrder(timeout=self.timeout)
df_reorder = check_column_order_task.bind(
table=self.table,
schema=self.schema,
df=df,
if_exists=self.if_exists,
credentials_secret=self.sqldb_credentials_secret,
flow=self,
)
if self.check_col_order == False:
df_to_csv = df_to_csv_task.bind(
df=df,
path=self.local_file_path,
sep=self.write_sep,
remove_tab=self.remove_tab,
create_table_task = AzureSQLCreateTable(timeout=self.timeout)
create_table_task.bind(
schema=self.schema,
table=self.table,
dtypes=dtypes,
if_exists=self._map_if_exists(self.if_exists),
credentials_secret=self.sqldb_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
else:
df_to_csv = df_to_csv_task.bind(
df=df_reorder,
bulk_insert_task = BCPTask(timeout=self.timeout)
bulk_insert_task.bind(
path=self.local_file_path,
sep=self.write_sep,
remove_tab=self.remove_tab,
schema=self.schema,
table=self.table,
error_log_file_path=self.name.replace(" ", "_") + ".log",
on_error=self.on_bcp_error,
credentials_secret=self.sqldb_credentials_secret,
vault_name=self.vault_name,
flow=self,
)

promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_conformed_task.bind(
from_path=self.adls_path,
to_path=self.adls_path_conformed,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_operations_task.bind(
from_path=self.adls_path_conformed,
to_path=self.adls_path_operations,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
create_table_task = AzureSQLCreateTable(timeout=self.timeout)
create_table_task.bind(
schema=self.schema,
table=self.table,
dtypes=dtypes,
if_exists=self._map_if_exists(self.if_exists),
credentials_secret=self.sqldb_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
bulk_insert_task = BCPTask(timeout=self.timeout)
bulk_insert_task.bind(
path=self.local_file_path,
schema=self.schema,
table=self.table,
error_log_file_path=self.name.replace(" ", "_") + ".log",
on_error=self.on_bcp_error,
credentials_secret=self.sqldb_credentials_secret,
vault_name=self.vault_name,
flow=self,
)

# data validation function (optional)
if self.validate_df_dict:
validate_df.bind(df=df, tests=self.validate_df_dict, flow=self)
validate_df.set_upstream(lake_to_df_task, flow=self)

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(promote_to_conformed_task, flow=self)
bulk_insert_task.set_upstream(create_table_task, flow=self)
# data validation function (optional)
if self.validate_df_dict:
validate_df.bind(df=df, tests=self.validate_df_dict, flow=self)
validate_df.set_upstream(lake_to_df_task, flow=self)

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(
promote_to_conformed_task, flow=self
)
bulk_insert_task.set_upstream(create_table_task, flow=self)

0 comments on commit d5904da

Please sign in to comment.