Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validate_df task to OutlookToADLS flow #788

Merged
merged 6 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions tests/integration/flows/test_outlook_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import os
from unittest import mock
from datetime import date, timedelta
from prefect.tasks.secrets import PrefectSecret

import pandas as pd
import pytest

from viadot.flows import OutlookToADLS
from viadot.tasks import AzureDataLakeRemove

ADLS_CREDENTIAL_SECRET = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
ADLS_FILE_NAME = "test_outlook_to_adls.parquet"
ADLS_DIR_PATH = "raw/tests/"

start_date = date.today() - timedelta(days=1)
start_date = start_date.strftime("%Y-%m-%d")
end_date = date.today().strftime("%Y-%m-%d")

mailbox_list = [
"[email protected]",
]

DATA = {
"sender": ["[email protected]"],
"receivers": ["[email protected]"],
}


def test_outlook_to_adls_flow_run():
flow = OutlookToADLS(
name="Test OutlookToADLS flow run",
mailbox_list=mailbox_list,
outbox_list=["Outbox", "Sent Items"],
start_date=start_date,
end_date=end_date,
local_file_path=ADLS_FILE_NAME,
adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
if_exists="replace",
timeout=4400,
)

result = flow.run()
assert result.is_successful()


def test_outlook_to_adls_run_flow_validate_fail():
flow = OutlookToADLS(
name="Test OutlookToADLS validate flow df fail",
mailbox_list=mailbox_list,
outbox_list=["Outbox", "Sent Items"],
start_date=start_date,
end_date=end_date,
local_file_path=ADLS_FILE_NAME,
adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
if_exists="replace",
validation_df_dict={"column_list_to_match": ["test", "wrong", "columns"]},
timeout=4400,
)

result = flow.run()
assert result.is_failed()


@mock.patch(
"viadot.tasks.OutlookToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_outlook_to_adls_run_flow_validate_success(mocked_task):
flow = OutlookToADLS(
name="Test OutlookToADLS validate flow df success",
mailbox_list=mailbox_list,
outbox_list=["Outbox", "Sent Items"],
start_date=start_date,
end_date=end_date,
local_file_path=ADLS_FILE_NAME,
adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
if_exists="replace",
validation_df_dict={"column_list_to_match": ["sender", "receivers"]},
timeout=4400,
)

result = flow.run()
assert result.is_successful()

os.remove("test_outlook_to_adls.parquet")
os.remove("romania_tehnic.csv")
os.remove("o365_token.txt")
rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s"
)
rm.run(sp_credentials_secret=ADLS_CREDENTIAL_SECRET)
12 changes: 12 additions & 0 deletions viadot/flows/outlook_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
df_to_csv,
df_to_parquet,
union_dfs_task,
validate_df,
)
from viadot.tasks import AzureDataLakeUpload, OutlookToDF

Expand All @@ -29,6 +30,7 @@ def __init__(
limit: int = 10000,
timeout: int = 3600,
if_exists: Literal["append", "replace", "skip"] = "append",
validation_df_dict: dict = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_df_dict: dict = None,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

outlook_credentials_secret: str = "OUTLOOK",
*args: List[Any],
**kwargs: Dict[str, Any],
Expand All @@ -54,6 +56,8 @@ def __init__(
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append".
validation_df_dict (dict, optional): An optional dictionary to verify the received dataframe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_df_dict

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

When passed, `validate_df` task validation tests are triggered. Defaults to None.
"""

self.mailbox_list = mailbox_list
Expand All @@ -65,6 +69,9 @@ def __init__(
self.local_file_path = local_file_path
self.if_exsists = if_exists

# Validate DataFrame
self.validation_df_dict = validation_df_dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


# AzureDataLakeUpload
self.adls_file_path = adls_file_path
self.output_file_extension = output_file_extension
Expand Down Expand Up @@ -98,6 +105,11 @@ def gen_flow(self) -> Flow:
dfs = apply_map(self.gen_outlook_df, self.mailbox_list, flow=self)

df = union_dfs_task.bind(dfs, flow=self)

if self.validation_df_dict:
validation = validate_df(df=df, tests=self.validation_df_dict, flow=self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

validation.set_upstream(df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)

if self.output_file_extension == ".parquet":
Expand Down
Loading