diff --git a/CHANGELOG.md b/CHANGELOG.md index d42e3279c..fe9d467f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,12 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added tests for new functionalities in SAPRFC and SAPRFCV2 regarding passing credentials - Added new params for mapping and reordering DataFrame for `Genesys` task and flow. - +- Tasks to search for logs in the flow +- Tasks to find flow ID +- Tasks used to control flows in multiflows by searching for a given log from a given task ### Fixed ### Changed +- if_no_data_returned added for sharepoint list flow which can fail,warn in case of no data returend or skip (continue) execution in the old way - Changed __init__ in SAPRFC and SAPRFCV2 class in source in order to raise warning in prefect when credentials will be taken from DEV. + ## [0.4.22] - 2023-11-15 ### Added - Added `TM1` source class. diff --git a/tests/integration/flows/test_sharepoint_to_adls.py b/tests/integration/flows/test_sharepoint_to_adls.py index f0597c41a..198704aa5 100644 --- a/tests/integration/flows/test_sharepoint_to_adls.py +++ b/tests/integration/flows/test_sharepoint_to_adls.py @@ -5,8 +5,8 @@ import pendulum import pytest from prefect.tasks.secrets import PrefectSecret +from viadot.flows import SharepointToADLS, SharepointListToADLS -from viadot.flows import SharepointListToADLS, SharepointToADLS from viadot.tasks import AzureDataLakeRemove ADLS_FILE_NAME = str(pendulum.now("utc")) + ".csv" @@ -14,9 +14,12 @@ ADLS_DIR_PATH = "raw/tests/" CREDENTIALS_SECRET = PrefectSecret("AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET").run() DATA = {"country": [1, 2], "sales": [3, 4]} +EMPTY_DATA = {} # SharepointToADLS + + @mock.patch( "viadot.tasks.SharepointToDF.run", return_value=pd.DataFrame(data=DATA), @@ -77,6 +80,8 @@ def test_sharepoint_to_adls_run_flow_overwrite_false(mocked_class): # SharepointListToADLS + + @mock.patch( "viadot.tasks.SharepointListToDF.run", return_value=pd.DataFrame(data=DATA), @@ -159,3 +164,83 @@ def test_sharepoint_list_to_adls_run_flow_overwrite_true(mocked_class): assert result.is_successful() os.remove(ADLS_FILE_NAME_LIST + ".csv") os.remove("test_sharepoint_to_adls_run_flow_overwrite_true.json") + + +@mock.patch( + "viadot.tasks.SharepointListToDF.run", + return_value=pd.DataFrame(data=EMPTY_DATA), +) +@pytest.mark.run +def test_sharepoint_list_to_adls_run_flow_fail_on_no_data_returned(mocked_class): + """ + Test will check if flow is failing when empty DF is passed + with the given parameter if_no_data_returned = "fail" + CSV file should not be generated! + """ + flow = SharepointListToADLS( + "test_sharepoint_to_adls_run_flow", + output_file_extension=".csv", + adls_sp_credentials_secret=CREDENTIALS_SECRET, + adls_dir_path=ADLS_DIR_PATH, + file_name=ADLS_FILE_NAME_LIST, + list_title="", + site_url="", + if_no_data_returned="fail", + ) + result = flow.run() + assert result.is_failed() + + +@mock.patch( + "viadot.tasks.SharepointListToDF.run", + return_value=pd.DataFrame(data=EMPTY_DATA), +) +@pytest.mark.run +def test_sharepoint_list_to_adls_run_flow_success_on_no_data_returned(mocked_class): + """ + Test will check if flow will succeed when empty DF is passed + with the given parameter if_no_data_returned = "skip" + Empty csv should be generated! + """ + flow = SharepointListToADLS( + "test_sharepoint_to_adls_run_flow", + output_file_extension=".csv", + adls_sp_credentials_secret=CREDENTIALS_SECRET, + adls_dir_path=ADLS_DIR_PATH, + file_name=ADLS_FILE_NAME_LIST, + list_title="", + site_url="", + if_no_data_returned="skip", + ) + result = flow.run() + assert result.is_successful() + os.remove(ADLS_FILE_NAME_LIST + ".csv") + os.remove("test_sharepoint_to_adls_run_flow.json") + + +@mock.patch( + "viadot.tasks.SharepointListToDF.run", + return_value=pd.DataFrame(data=EMPTY_DATA), +) +@pytest.mark.run +def test_sharepoint_list_to_adls_run_flow_success_warn_on_no_data_returned( + mocked_class, +): + """ + Test will check if flow is failing when empty DF is passed + with the given parameter if_no_data_returned = "warn" + CSV file should not be generated! + """ + # Get prefect client instance + flow = SharepointListToADLS( + "test_sharepoint_to_adls_run_flow", + output_file_extension=".csv", + adls_sp_credentials_secret=CREDENTIALS_SECRET, + adls_dir_path=ADLS_DIR_PATH, + file_name=ADLS_FILE_NAME_LIST, + list_title="", + site_url="", + if_no_data_returned="warn", + ) + result = flow.run() + assert result.is_successful() diff --git a/tests/integration/tasks/test_task_utils.py b/tests/integration/tasks/test_task_utils.py index f22d55022..d10cceb4d 100644 --- a/tests/integration/tasks/test_task_utils.py +++ b/tests/integration/tasks/test_task_utils.py @@ -1,9 +1,15 @@ +import pytest import pandas as pd from prefect.backend import get_key_value, set_key_value from prefect.engine.state import Failed, Success from prefect.tasks.secrets import PrefectSecret -from viadot.task_utils import custom_mail_state_handler, set_new_kv +from viadot.task_utils import ( + custom_mail_state_handler, + set_new_kv, + search_for_msg_in_logs, + check_if_df_empty, +) def test_custom_state_handler(): @@ -28,3 +34,37 @@ def test_set_new_kv(): result = get_key_value("test_for_setting_kv") assert result == "72" set_key_value(key="test_for_setting_kv", value=None) + + +def test_search_for_msg_in_logs(): + logs = [ + {"message": "Error occurred"}, + {"message": "Warning: Invalid input"}, + {"message": "Log message"}, + ] + + # Test when the message is found in the logs + assert search_for_msg_in_logs.run(logs, "Error occurred") == True + + # Test when the message is not found in the logs + assert search_for_msg_in_logs.run(logs, "Info message") == False + + +def test_check_if_df_empty(): + df = pd.DataFrame() + from prefect.engine import signals + + # Test when the DataFrame is empty and if_no_data_returned is "warn" + assert check_if_df_empty.run(df, if_no_data_returned="warn") == True + + # Test when the DataFrame is empty and if_no_data_returned is "fail" + try: + check_if_df_empty.run(df, if_no_data_returned="fail") + except: + print("Task failed") + # Test when the DataFrame is empty and if_no_data_returned is "skip" + assert check_if_df_empty.run(df, if_no_data_returned="skip") == False + + # Test when the DataFrame is not empty + df = pd.DataFrame({"col": [1, 2, 3]}) + assert check_if_df_empty.run(df) == False diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index 79b511c53..0d26d75aa 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -3,7 +3,10 @@ from typing import Any, Dict, List, Literal import pendulum -from prefect import Flow +from prefect import Flow, task, case +from prefect.engine.state import Failed +from prefect.engine.runner import ENDRUN +from typing import Literal from prefect.backend import set_key_value from prefect.utilities import logging @@ -18,6 +21,7 @@ ) from viadot.tasks import AzureDataLakeUpload from viadot.tasks.sharepoint import SharepointListToDF, SharepointToDF +from viadot.task_utils import check_if_df_empty logger = logging.get_logger() @@ -207,6 +211,7 @@ def __init__( sep: str = "\t", validate_df_dict: dict = None, set_prefect_kv: bool = False, + if_no_data_returned: Literal["skip", "warn", "fail"] = "skip", *args: List[any], **kwargs: Dict[str, Any], ): @@ -219,7 +224,7 @@ def __init__( name (str): Prefect flow name. list_title (str): Title of Sharepoint List. site_url (str): URL to set of Sharepoint Lists. - file_name (str): Name of file in ADLS. Defaults to None. + file_name (str): Name of file (without extension) in ADLS. Defaults to None. adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. filters (dict, optional): Dictionary with operators which filters the SharepointList output. Defaults to None. allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') @@ -282,6 +287,7 @@ def __init__( self.vault_name = vault_name self.row_count = row_count self.validate_df_dict = validate_df_dict + self.if_no_data_returned = if_no_data_returned # AzureDataLakeUpload self.adls_dir_path = adls_dir_path @@ -330,65 +336,71 @@ def gen_flow(self) -> Flow: credentials_secret=self.sp_cert_credentials_secret, ) - if self.validate_df_dict: - validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) - validation_task.set_upstream(df, flow=self) + df_empty = check_if_df_empty.bind(df, self.if_no_data_returned, flow=self) - df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) - dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) - df_mapped = df_map_mixed_dtypes_for_parquet.bind( - df_with_metadata, dtypes_dict, flow=self - ) + with case(df_empty, False): + if self.validate_df_dict: + validation_task = validate_df( + df=df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) - if self.output_file_extension == ".csv": - df_to_file = df_to_csv.bind( - df=df_with_metadata, - path=self.local_file_path, - sep=self.sep, - flow=self, + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self ) - elif self.output_file_extension == ".parquet": - df_to_file = df_to_parquet.bind( - df=df_mapped, - path=self.local_file_path, + + if self.output_file_extension == ".csv": + df_to_file = df_to_csv.bind( + df=df_with_metadata, + path=self.local_file_path, + sep=self.sep, + flow=self, + ) + elif self.output_file_extension == ".parquet": + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.local_file_path, + flow=self, + ) + else: + raise ValueError( + "Output file extension can only be '.csv' or '.parquet'" + ) + + file_to_adls_task = AzureDataLakeUpload() + file_to_adls_task.bind( + from_path=self.local_file_path, + to_path=self.adls_dir_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, flow=self, ) - else: - raise ValueError("Output file extension can only be '.csv' or '.parquet'") - - file_to_adls_task = AzureDataLakeUpload() - file_to_adls_task.bind( - from_path=self.local_file_path, - to_path=self.adls_dir_path, - overwrite=self.overwrite, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) - dtypes_to_json_task.bind( - dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self - ) - - json_to_adls_task = AzureDataLakeUpload() - json_to_adls_task.bind( - from_path=self.local_json_path, - to_path=self.adls_schema_file_dir_file, - overwrite=self.overwrite, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) - - if self.validate_df_dict: - df_with_metadata.set_upstream(validation_task, flow=self) + dtypes_to_json_task.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) - df_mapped.set_upstream(df_with_metadata, flow=self) - dtypes_to_json_task.set_upstream(df_mapped, flow=self) - df_to_file.set_upstream(dtypes_to_json_task, flow=self) + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) - file_to_adls_task.set_upstream(df_to_file, flow=self) - json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - if self.set_prefect_kv == True: - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + dtypes_dict.set_upstream(df_with_metadata, flow=self) + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json_task.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + if self.set_prefect_kv == True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) @staticmethod def slugify(name): diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 6a532f932..4459c715d 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -3,9 +3,10 @@ import os import re import shutil +import pendulum from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast +from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast, Tuple import pandas as pd import prefect @@ -14,6 +15,7 @@ from prefect import Flow, Task, task from prefect.backend import set_key_value from prefect.engine.state import Failed +from prefect.engine.runner import ENDRUN from prefect.storage import Git from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging @@ -27,6 +29,7 @@ from viadot.exceptions import CredentialError, ValidationError from viadot.tasks import AzureDataLakeUpload, AzureKeyVaultSecret + logger = logging.get_logger() METADATA_COLUMNS = {"_viadot_downloaded_at_utc": "DATETIME"} @@ -792,3 +795,234 @@ def validate_df(df: pd.DataFrame, tests: dict = None) -> None: raise ValidationError( f"Validation failed for {failed_tests} test/tests: {failed_tests_msg}" ) + + +@task(timeout=3600, slug="check_df") +def check_if_df_empty( + df, if_no_data_returned: Literal["fail", "warn", "skip"] = "fail" +) -> bool: + """ + Check if a DataFrame received as a data source response is empty. + If fail is expected , this task will finish with ENDRUN(Failed()) state. + + Args: + df (pandas.DataFrame): The DataFrame to check. + if_no_data_returned (Literal["fail", "warn", "skip"], optional): The action to take if no data is returned in the DataFrame. Defaults to "fail". + Options are "fail" (default), "warn", or "skip". + + Returns: + bool: True if the DataFrame is empty and the action is "warn", False otherwise. + + Raises: + ENDRUN: If the DataFrame is empty and the action is "fail". + + Example: + >>> df = pd.DataFrame() + >>> check_if_df_empty(df, if_no_data_returned="warn") + True + """ + if df.empty: + if if_no_data_returned == "warn": + logger.warning("No data in the source response. Df empty.") + return True + elif if_no_data_returned == "fail": + raise ENDRUN(state=Failed("No data in the source response. Df empty...")) + elif if_no_data_returned == "skip": + return False + else: + return False + + +@task(timeout=3600) +def get_flow_run_id(client: prefect.Client, flow_name: str, state: str) -> str: + """Gets the last flow run ID based on the name of the flow and time of its run in descending order of the flow runs. + + Args: + client (prefect.Client): The Prefect client used to execute the GraphQL query. + flow_name (str): The name of the flow to search for. + state (str): The state of the flow run to filter by. + + Returns: + str: The ID of the last flow run that matches the given flow name and state. + + Raises: + ValueError: If the given flow name cannot be found in the Prefect Cloud API. + + Example: + >>> client = prefect.Client() + >>> flow_name = "My Flow" + >>> state = "SUCCESS" + >>> get_flow_run_id(client, flow_name, state) + "flow_run_id_12345" + """ + # Construct the GraphQL query + query = f""" + {{ + flow_run( + where: {{ + flow: {{ + name: {{_eq: "{flow_name}"}} + }} + state: {{_eq: "{state}"}} + }} + order_by : {{end_time: desc}} + limit : 1 + ){{ + id + }} + }} + """ + # Execute the GraphQL query + response = client.graphql(query) + result_data = response.get("data").get("flow_run") + if result_data: + flow_run_id = result_data.get("id")[0] + return flow_run_id + else: + raise ValueError("Given flow name cannot be found in the Prefect Cloud API") + + +@task(timeout=3600) +def get_task_logs(client: prefect.Client, flow_run_id: str, task_slug: str) -> List: + """ + Retrieves the logs for a specific task in a flow run using the Prefect client and GraphQL query. + + Args: + client (prefect.Client): The Prefect client used to execute the GraphQL query. + flow_run_id (str): The ID of the flow run. + task_slug (str): The slug of the task to retrieve logs for. + + Returns: + List[Dict[str, Union[str, List[Dict[str, str]]]]]: A list of log entries for the specified task. + Each log entry is a dictionary with 'message' and 'level' keys. + + Raises: + ValueError: If no data is available for the given task slug. + + Example: + >>> client = prefect.Client() + >>> flow_run_id = "flow_run_id_12345" + >>> task_slug = "my_task" + >>> get_task_logs(client, flow_run_id, task_slug) + [{'message': 'Log message 1', 'level': 'INFO'}, {'message': 'Log message 2', 'level': 'DEBUG'}] + """ + # Construct the GraphQL query + query = f""" + {{ + task_run( + where: {{ + flow_run_id: {{_eq: "{flow_run_id}"}}, + task: {{slug: {{_eq: "{task_slug}"}}}} + }} + ) {{ + state + logs {{ + message + level + }} + }} + }} + """ + # Execute the GraphQL query + logger.info("Executing GraphQL query to get task logs.") + response = client.graphql(query) + result_data = response.get("data").get("task_run") + # Extract task logs + if result_data: + logs = result_data[0].get("logs") + return logs + else: + raise ValueError("No data available for the given task slug.") + + +@task(timeout=3600) +def send_email_notification( + from_address: Union[str, Tuple], + to_address: Union[str, List[str], List[Tuple], Tuple[str]], + content: str, + subject: str, + vault_name: str, + mail_credentials_secret: str, + timezone: str = "Europe/Warsaw", +) -> str: + """ + Sends an email notification using SendGrid API. + + Args: + from_address (Union[str, Tuple]): The email address of the sender. + to_address (Union[str, List[str], List[Tuple], Tuple[str]]): The email address(es) of the recipient(s). + content (str): The content of the email. + subject (str): The subject of the email. + vault_name (str): The name of the Azure Key Vault. + mail_credentials_secret (str): The secret name for the SendGrid API key. + timezone (str, optional): The timezone to use for the current datetime. Defaults to "Europe/Warsaw". + + Returns: + str: The response from the SendGrid API. + + Raises: + Exception: If the API key is not provided. + + Example: + >>> send_email_notification("sender@example.com", "recipient@example.com", "Hello!", "Test Email", "my-vault", "sendgrid-api-key") + 'Email sent successfully' + """ + + # Retrieve the SendGrid API key from the secret + if mail_credentials_secret is None: + mail_credentials_secret = PrefectSecret("SENDGRID_DEFAULT_SECRET").run() + elif mail_credentials_secret is not None: + credentials_str = AzureKeyVaultSecret( + mail_credentials_secret, vault_name=vault_name + ).run() + api_key = json.loads(credentials_str).get("API_KEY") + else: + raise Exception("Please provide API KEY") + + # Get the current datetime in the specified timezone + curr_dt = pendulum.now(tz=timezone) + + # Create the email message + message = Mail( + from_email=from_address, + to_emails=to_address, + subject=subject, + html_content=f"{content}", + ) + + # Send the email using SendGrid API + send_grid = SendGridAPIClient(api_key) + response = send_grid.send(message) + return response + + +@task(timeout=3600) +def search_for_msg_in_logs(logs: list, log_info: str) -> bool: + """ + Searches for a specific message in Prefect flow or task logs. + + Args: + logs (list): The logs to search in. + log_info (str): The message to search for. + + Returns: + bool: True if the message is found, False otherwise. + + Example: + >>> logs = [ + ... {"message": "Error occurred"}, + ... {"message": "Warning: Invalid input"}, + ... {"message": "Log message"} + ... ] + >>> search_for_msg_in_logs(logs, "Error occurred") + True + """ + found_msg = False + + # Iterate over each log entry + for value in logs: + if value.get("message") == log_info: + found_msg = True + break + + return found_msg