Skip to content

Commit

Permalink
Merge pull request #834 from burzekj/validate_df_implementation
Browse files Browse the repository at this point in the history
`validate_df` implementation to `ADLSToAzureSQL`
  • Loading branch information
Rafalz13 authored Jan 17, 2024
2 parents 0fe61b3 + d20ebcf commit 57d9535
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Added logic for if_empty param: `check_if_df_empty` task to `ADLSToAzureSQL` flow
- Added new parameter `validate_df_dict` to `ADLSToAzureSQL` class

### Fixed

Expand Down
45 changes: 45 additions & 0 deletions tests/integration/flows/test_adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,48 @@ def test_check_dtypes_sort():
assert False
except signals.FAIL:
assert True


def test_adls_to_azure_sql_mocked(TEST_CSV_FILE_PATH):
with mock.patch.object(ADLSToAzureSQL, "run", return_value=True) as mock_method:
instance = ADLSToAzureSQL(
name="test_adls_to_azure_sql_flow",
adls_path=TEST_CSV_FILE_PATH,
schema="sandbox",
table="test_bcp",
dtypes={"test_str": "VARCHAR(25)", "test_int": "INT"},
if_exists="replace",
)
instance.run()
mock_method.assert_called_with()


def test_adls_to_azure_sql_mocked_validate_df_param(TEST_CSV_FILE_PATH):
with mock.patch.object(ADLSToAzureSQL, "run", return_value=True) as mock_method:
instance = ADLSToAzureSQL(
name="test_adls_to_azure_sql_flow",
adls_path=TEST_CSV_FILE_PATH,
schema="sandbox",
table="test_bcp",
dtypes={"test_str": "VARCHAR(25)", "test_int": "INT"},
if_exists="replace",
validate_df_dict={"column_list_to_match": ["test_str", "test_int"]},
)
instance.run()
mock_method.assert_called_with()


def test_adls_to_azure_sql_mocked_wrong_param(TEST_CSV_FILE_PATH):
with pytest.raises(TypeError) as excinfo:
instance = ADLSToAzureSQL(
name="test_adls_to_azure_sql_flow",
adls_path=TEST_CSV_FILE_PATH,
schema="sandbox",
table="test_bcp",
dtypes={"test_str": "VARCHAR(25)", "test_int": "INT"},
if_exists="replace",
validate_df_dit={"column_list_to_match": ["test_str", "test_int"]},
)
instance.run()

assert "validate_df_dit" in str(excinfo)
10 changes: 10 additions & 0 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CheckColumnOrder,
DownloadGitHubFile,
)
from viadot.task_utils import validate_df
from viadot.tasks.azure_data_lake import AzureDataLakeDownload
from viadot.task_utils import check_if_df_empty

Expand Down Expand Up @@ -151,6 +152,7 @@ def __init__(
tags: List[str] = ["promotion"],
vault_name: str = None,
timeout: int = 3600,
validate_df_dict: Dict[str, Any] = None,
*args: List[any],
**kwargs: Dict[str, Any],
):
Expand Down Expand Up @@ -187,6 +189,8 @@ def __init__(
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe.
If defined, triggers the `validate_df` task from task_utils. Defaults to None.
"""

adls_path = adls_path.strip("/")
Expand Down Expand Up @@ -237,6 +241,7 @@ def __init__(
self.tags = tags
self.vault_name = vault_name
self.timeout = timeout
self.validate_df_dict = validate_df_dict

super().__init__(*args, name=name, **kwargs)

Expand Down Expand Up @@ -360,6 +365,11 @@ def gen_flow(self) -> Flow:
flow=self,
)

# data validation function (optional)
if self.validate_df_dict:
validate_df.bind(df=df, tests=self.validate_df_dict, flow=self)
validate_df.set_upstream(lake_to_df_task, flow=self)

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
Expand Down

0 comments on commit 57d9535

Please sign in to comment.