Skip to content

Commit

Permalink
Merge pull request #782 from dyvenia/genesys_validate_df
Browse files Browse the repository at this point in the history
Genesys validate_df
  • Loading branch information
m-paz authored Oct 26, 2023
2 parents 20db130 + 5e49816 commit 96066e4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
7 changes: 6 additions & 1 deletion viadot/flows/genesys_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd
from prefect import Flow, task

from viadot.task_utils import add_ingestion_metadata_task, adls_bulk_upload
from viadot.task_utils import add_ingestion_metadata_task, adls_bulk_upload, validate_df
from viadot.tasks.genesys import GenesysToCSV


Expand Down Expand Up @@ -95,6 +95,7 @@ def __init__(
overwrite_adls: bool = True,
adls_sp_credentials_secret: str = None,
credentials_genesys: Dict[str, Any] = None,
validate_df_dict: Dict[str, Any] = None,
timeout: int = 3600,
*args: List[any],
**kwargs: Dict[str, Any],
Expand Down Expand Up @@ -143,6 +144,8 @@ def __init__(
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
credentials(dict, optional): Credentials for the genesys api. Defaults to None.
validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers
the `validate_df` task from task_utils. Defaults to None.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
Expand All @@ -165,6 +168,7 @@ def __init__(
self.start_date = start_date
self.end_date = end_date
self.sep = sep
self.validate_df_dict = validate_df_dict
self.timeout = timeout

# AzureDataLake
Expand All @@ -183,6 +187,7 @@ def gen_flow(self) -> Flow:
timeout=self.timeout,
local_file_path=self.local_file_path,
sep=self.sep,
validate_df_dict=self.validate_df_dict,
)

file_names = to_csv.bind(
Expand Down
14 changes: 13 additions & 1 deletion viadot/tasks/genesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from prefect.engine import signals
from prefect.utilities import logging
from prefect.utilities.tasks import defaults_from_attrs
from viadot.task_utils import *

from viadot.exceptions import APIError
from viadot.sources import Genesys
Expand All @@ -33,6 +34,7 @@ def __init__(
conversationId_list: List[str] = None,
key_list: List[str] = None,
credentials_genesys: Dict[str, Any] = None,
validate_df_dict: Dict[str, Any] = None,
timeout: int = 3600,
*args: List[Any],
**kwargs: Dict[str, Any],
Expand All @@ -54,6 +56,8 @@ def __init__(
sep (str, optional): Separator in csv file. Defaults to "\t".
conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None.
key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None.
validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers
the `validate_df` task from task_utils. Defaults to None.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
Expand All @@ -72,6 +76,7 @@ def __init__(
self.sep = sep
self.conversationId_list = conversationId_list
self.key_list = key_list
self.validate_df_dict = validate_df_dict

super().__init__(
name=self.report_name,
Expand Down Expand Up @@ -293,6 +298,7 @@ def merge_conversations_dfs(self, data_to_merge: list) -> DataFrame:
"credentials_genesys",
"conversationId_list",
"key_list",
"validate_df_dict",
)
def run(
self,
Expand All @@ -309,6 +315,7 @@ def run(
conversationId_list: List[str] = None,
key_list: List[str] = None,
credentials_genesys: Dict[str, Any] = None,
validate_df_dict: Dict[str, Any] = None,
) -> List[str]:
"""
Task for downloading data from the Genesys API to DF.
Expand All @@ -327,6 +334,8 @@ def run(
report_columns (List[str], optional): List of exisiting column in report. Defaults to None.
conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None.
key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None.
validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers
the `validate_df` task from task_utils. Defaults to None.
Returns:
List[str]: List of file names.
Expand Down Expand Up @@ -450,7 +459,8 @@ def run(

date = start_date.replace("-", "")
file_name = f"conversations_detail_{date}".upper() + ".csv"

if validate_df_dict:
validate_df.run(df=final_df, tests=validate_df_dict)
final_df.to_csv(
os.path.join(self.local_file_path, file_name),
index=False,
Expand Down Expand Up @@ -488,6 +498,8 @@ def run(
end = end_date.replace("-", "")

file_name = f"WEBMESSAGE_{start}-{end}.csv"
if validate_df_dict:
validate_df.run(df=df, tests=validate_df_dict)
df.to_csv(
os.path.join(file_name),
index=False,
Expand Down

0 comments on commit 96066e4

Please sign in to comment.