diff --git a/CHANGELOG.md b/CHANGELOG.md index 20efb4496..76eb3280b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +## [0.4.21] - 2023-10-26 ### Added +- Added `validate_df` task to task_utils. +- Added `SharepointList` source class. +- Added `SharepointListToDF` task class. +- Added `SharepointListToADLS` flow class. +- Added tests for `SharepointList`. +- Added `get_nested_dict` to untils.py. ### Fixed ### Changed - +- Changed `GenesysToCSV` logic for end_point == "conversations". Added new fields to extraction. ## [0.4.20] - 2023-10-12 ### Added @@ -618,4 +626,4 @@ specified in the `SUPERMETRICS_DEFAULT_USER` secret - Moved from poetry to pip ### Fixed -- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part +- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part \ No newline at end of file diff --git a/tests/integration/flows/test_bigquery_to_adls.py b/tests/integration/flows/test_bigquery_to_adls.py index 1a867b58c..de793344a 100644 --- a/tests/integration/flows/test_bigquery_to_adls.py +++ b/tests/integration/flows/test_bigquery_to_adls.py @@ -1,11 +1,16 @@ import os import pendulum -from prefect.tasks.secrets import PrefectSecret +import pytest +from unittest import mock +import pandas as pd +from prefect.tasks.secrets import PrefectSecret from viadot.flows import BigQueryToADLS from viadot.tasks import AzureDataLakeRemove +from viadot.exceptions import ValidationError + ADLS_DIR_PATH = "raw/tests/" ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet" BIGQ_CREDENTIAL_KEY = "BIGQUERY-TESTS" @@ -72,6 +77,68 @@ def test_bigquery_to_adls_false(): assert result.is_failed() os.remove("test_bigquery_to_adls_overwrite_false.parquet") os.remove("test_bigquery_to_adls_overwrite_false.json") + + +DATA = { + "type": ["banner", "banner"], + "country": ["PL", "DE"], +} + + +@mock.patch( + "viadot.tasks.BigQueryToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_bigquery_to_adls_validate_df_fail(mocked_data): + flow_bigquery = BigQueryToADLS( + name="Test BigQuery to ADLS validate df fail", + dataset_name="official_empty", + table_name="space", + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, + validate_df_dict={"column_list_to_match": ["type", "country", "test"]}, + ) + try: + result = flow_bigquery.run() + except ValidationError: + pass + + os.remove("test_bigquery_to_adls_validate_df_fail.parquet") + os.remove("test_bigquery_to_adls_validate_df_fail.json") + + +@mock.patch( + "viadot.tasks.BigQueryToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_bigquery_to_adls_validate_df_success(mocked_data): + flow_bigquery = BigQueryToADLS( + name="Test BigQuery to ADLS validate df success", + dataset_name="official_empty", + table_name="space", + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, + validate_df_dict={"column_list_to_match": ["type", "country"]}, + ) + result = flow_bigquery.run() + + result = flow_bigquery.run() + assert result.is_successful() + + task_results = result.result.values() + assert all([task_result.is_successful() for task_result in task_results]) + + os.remove("test_bigquery_to_adls_validate_df_success.parquet") + os.remove("test_bigquery_to_adls_validate_df_success.json") + rm = AzureDataLakeRemove( path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" ) diff --git a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py index b506e4a14..f0661e314 100644 --- a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py +++ b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py @@ -1,5 +1,6 @@ from viadot.config import local_config from viadot.flows import CloudForCustomersReportToADLS +from viadot.exceptions import ValidationError def test_cloud_for_customers_report_to_adls(): @@ -27,3 +28,61 @@ def test_cloud_for_customers_report_to_adls(): task_results = result.result.values() assert all([task_result.is_successful() for task_result in task_results]) + + assert len(flow.tasks) == 6 + + +def test_cloud_for_customers_report_to_adls_validation_fail(caplog): + credentials = local_config.get("CLOUD_FOR_CUSTOMERS") + credentials_prod = credentials["Prod"] + channels = ["VEL_B_AFS", "VEL_B_ASA"] + month = ["01"] + year = ["2021"] + flow = CloudForCustomersReportToADLS( + report_url=credentials_prod["server"], + env="Prod", + channels=channels, + months=month, + years=year, + name="test_c4c_report_to_adls", + local_file_path=f"test_c4c_report_to_adls.csv", + adls_sp_credentials_secret=credentials["adls_sp_credentials_secret"], + adls_dir_path=credentials["adls_dir_path"], + validate_df_dict={"column_size": {"ChannelName ID": 10}}, + ) + try: + result = flow.run() + except ValidationError: + pass + + +def test_cloud_for_customers_report_to_adls_validation_success(): + credentials = local_config.get("CLOUD_FOR_CUSTOMERS") + credentials_prod = credentials["Prod"] + channels = ["VEL_B_AFS", "VEL_B_ASA"] + month = ["01"] + year = ["2021"] + flow = CloudForCustomersReportToADLS( + report_url=credentials_prod["server"], + env="Prod", + channels=channels, + months=month, + years=year, + name="test_c4c_report_to_adls", + local_file_path=f"test_c4c_report_to_adls.csv", + adls_sp_credentials_secret=credentials["adls_sp_credentials_secret"], + adls_dir_path=credentials["adls_dir_path"], + validate_df_dict={"column_size": {"ChannelName ID": 13}}, + ) + + try: + result = flow.run() + except ValidationError: + assert False, "Validation failed but was expected to pass" + + assert result.is_successful() + + task_results = result.result.values() + assert all([task_result.is_successful() for task_result in task_results]) + + assert len(flow.tasks) == 7 diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index e6cdf1545..0e7afd3e2 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -5,6 +5,7 @@ import pytest from viadot.flows import CustomerGaugeToADLS +from viadot.exceptions import ValidationError DATA = { "user_name": ["Jane", "Bob"], @@ -15,6 +16,7 @@ "user_address_country_name": "United States", "user_address_country_code": "US", } + COLUMNS = ["user_name", "user_address_street"] ADLS_FILE_NAME = "test_customer_gauge.parquet" ADLS_DIR_PATH = "raw/tests/" @@ -40,3 +42,54 @@ def test_customer_gauge_to_adls_run_flow(mocked_class): assert result.is_successful() os.remove("test_customer_gauge_to_adls_flow_run.parquet") os.remove("test_customer_gauge_to_adls_flow_run.json") + + +@mock.patch( + "viadot.tasks.CustomerGaugeToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_customer_gauge_to_adls_run_flow_validation_success(mocked_class): + flow = CustomerGaugeToADLS( + "test_customer_gauge_to_adls_run_flow_validation_success", + endpoint="responses", + total_load=False, + anonymize=True, + columns_to_anonymize=COLUMNS, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + validate_df_dict={"column_size": {"user_address_state": 2}}, + ) + result = flow.run() + assert result.is_successful() + assert len(flow.tasks) == 11 + + os.remove("test_customer_gauge_to_adls_run_flow_validation_success.parquet") + os.remove("test_customer_gauge_to_adls_run_flow_validation_success.json") + + +@mock.patch( + "viadot.tasks.CustomerGaugeToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_customer_gauge_to_adls_run_flow_validation_failure(mocked_class): + flow = CustomerGaugeToADLS( + "test_customer_gauge_to_adls_run_flow_validation_failure", + endpoint="responses", + total_load=False, + anonymize=True, + columns_to_anonymize=COLUMNS, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + validate_df_dict={"column_size": {"user_name": 5}}, + ) + try: + flow.run() + except ValidationError: + pass + + os.remove("test_customer_gauge_to_adls_run_flow_validation_failure.parquet") + os.remove("test_customer_gauge_to_adls_run_flow_validation_failure.json") diff --git a/tests/integration/flows/test_eurostat_to_adls.py b/tests/integration/flows/test_eurostat_to_adls.py index 9225655e9..e15f10a7d 100644 --- a/tests/integration/flows/test_eurostat_to_adls.py +++ b/tests/integration/flows/test_eurostat_to_adls.py @@ -6,7 +6,11 @@ from viadot.flows import EurostatToADLS -DATA = {"geo": ["PL", "DE", "NL"], "indicator": [35, 55, 77]} +DATA = { + "geo": ["PL", "DE", "NL"], + "indicator": [35, 55, 77], + "time": ["2023-01", "2023-51", "2023-07"], +} ADLS_FILE_NAME = "test_eurostat.parquet" ADLS_DIR_PATH = "raw/tests/" @@ -28,3 +32,23 @@ def test_eurostat_to_adls_run_flow(mocked_class): assert result.is_successful() os.remove("test_eurostat_to_adls_flow_run.parquet") os.remove("test_eurostat_to_adls_flow_run.json") + + +@mock.patch( + "viadot.tasks.EurostatToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_validate_df(mocked_class): + flow = EurostatToADLS( + "test_validate_df", + dataset_code="ILC_DI04", + overwrite_adls=True, + validate_df_dict={"column_size": {"time": 7}}, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_validate_df.parquet") + os.remove("test_validate_df.json") diff --git a/tests/integration/flows/test_hubspot_to_adls.py b/tests/integration/flows/test_hubspot_to_adls.py index a67d00aca..d960fc079 100644 --- a/tests/integration/flows/test_hubspot_to_adls.py +++ b/tests/integration/flows/test_hubspot_to_adls.py @@ -6,6 +6,7 @@ import pytest from viadot.flows import HubspotToADLS +from viadot.exceptions import ValidationError DATA = { "id": {"0": "820306930"}, @@ -60,3 +61,81 @@ def test_hubspot_to_adls_flow_run(mocked_class): assert result.is_successful() os.remove("test_hubspot_to_adls_flow_run.parquet") os.remove("test_hubspot_to_adls_flow_run.json") + + +@mock.patch( + "viadot.tasks.HubspotToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_hubspot_to_adls_flow_run_validate_fail(mocked_class): + flow = HubspotToADLS( + "test_hubspot_to_adls_flow_run", + hubspot_credentials_key="HUBSPOT", + endpoint="line_items", + filters=[ + { + "filters": [ + { + "propertyName": "createdate", + "operator": "BETWEEN", + "highValue": "2021-01-01", + "value": "2021-01-01", + }, + {"propertyName": "quantity", "operator": "EQ", "value": "2"}, + ] + }, + { + "filters": [ + {"propertyName": "amount", "operator": "EQ", "value": "3744.000"} + ] + }, + ], + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_size": {"id": 0}}, + ) + try: + flow.run() + except ValidationError: + pass + + +@mock.patch( + "viadot.tasks.HubspotToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_hubspot_to_adls_flow_run_validate_success(mocked_class): + flow = HubspotToADLS( + "test_hubspot_to_adls_flow_run", + hubspot_credentials_key="HUBSPOT", + endpoint="line_items", + filters=[ + { + "filters": [ + { + "propertyName": "createdate", + "operator": "BETWEEN", + "highValue": "2021-01-01", + "value": "2021-01-01", + }, + {"propertyName": "quantity", "operator": "EQ", "value": "2"}, + ] + }, + { + "filters": [ + {"propertyName": "amount", "operator": "EQ", "value": "3744.000"} + ] + }, + ], + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_unique_values": ["id"]}, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_hubspot_to_adls_flow_run.parquet") + os.remove("test_hubspot_to_adls_flow_run.json") diff --git a/tests/integration/flows/test_mediatool_to_adls.py b/tests/integration/flows/test_mediatool_to_adls.py index 88746d16b..d7b5b2658 100644 --- a/tests/integration/flows/test_mediatool_to_adls.py +++ b/tests/integration/flows/test_mediatool_to_adls.py @@ -5,6 +5,7 @@ import pytest from viadot.flows import MediatoolToADLS +from viadot.exceptions import ValidationError DATA = {"country": ["DK", "DE"], "sales": [3, 4]} ADLS_FILE_NAME = "test_mediatool.parquet" @@ -28,5 +29,51 @@ def test_mediatool_to_adls_run_flow(mocked_class): ) result = flow.run() assert result.is_successful() + assert len(flow.tasks) == 10 + os.remove("test_mediatool_to_adls_flow_run.parquet") + os.remove("test_mediatool_to_adls_flow_run.json") + + +@mock.patch( + "viadot.tasks.MediatoolToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_mediatool_to_adls_run_flow_validate_fail(mocked_class): + flow = MediatoolToADLS( + "test_mediatool_to_adls_flow_run", + organization_ids=["1000000001", "200000001"], + media_entries_columns=["id", "name", "num"], + mediatool_credentials_key="MEDIATOOL-TESTS", + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_size": {"country": 10}}, + ) + try: + flow.run() + except ValidationError: + pass + + +@mock.patch( + "viadot.tasks.MediatoolToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_mediatool_to_adls_run_flow_validate_success(mocked_class): + flow = MediatoolToADLS( + "test_mediatool_to_adls_flow_run", + organization_ids=["1000000001", "200000001"], + media_entries_columns=["id", "name", "num"], + mediatool_credentials_key="MEDIATOOL-TESTS", + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_size": {"country": 2}}, + ) + result = flow.run() + assert result.is_successful() + assert len(flow.tasks) == 11 os.remove("test_mediatool_to_adls_flow_run.parquet") os.remove("test_mediatool_to_adls_flow_run.json") diff --git a/tests/integration/flows/test_mysql_to_adls.py b/tests/integration/flows/test_mysql_to_adls.py index a66fca5db..942bab99d 100644 --- a/tests/integration/flows/test_mysql_to_adls.py +++ b/tests/integration/flows/test_mysql_to_adls.py @@ -1,5 +1,4 @@ from unittest import mock - from viadot.flows.mysql_to_adls import MySqlToADLS query = """SELECT * FROM `example-views`.`sales`""" @@ -23,3 +22,18 @@ def test_adls_gen1_to_azure_sql_new_mock(TEST_PARQUET_FILE_PATH): ) flow.run() mock_method.assert_called_with() + + +def test_validate_df(TEST_PARQUET_FILE_PATH): + with mock.patch.object(MySqlToADLS, "run", return_value=True) as mock_method: + flow = MySqlToADLS( + "test validate_df", + country_short="DE", + query=query, + file_path=TEST_PARQUET_FILE_PATH, + sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + to_path=f"raw/examples/{TEST_PARQUET_FILE_PATH}", + validate_df_dict={"column_size": {"sales_org": 3}}, + ) + flow.run() + mock_method.assert_called_with() diff --git a/tests/integration/flows/test_salesforce_to_adls.py b/tests/integration/flows/test_salesforce_to_adls.py index 60ec41aaa..ec68a1227 100644 --- a/tests/integration/flows/test_salesforce_to_adls.py +++ b/tests/integration/flows/test_salesforce_to_adls.py @@ -4,6 +4,7 @@ from viadot.flows import SalesforceToADLS from viadot.tasks import AzureDataLakeRemove +from viadot.exceptions import ValidationError ADLS_FILE_NAME = "test_salesforce.parquet" ADLS_DIR_PATH = "raw/tests/" @@ -32,3 +33,29 @@ def test_salesforce_to_adls(): vault_name="azuwevelcrkeyv001s", ) rm.run(sp_credentials_secret=credentials_secret) + + +def test_salesforce_to_adls_validate_success(): + credentials_secret = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" + ).run() + + flow = SalesforceToADLS( + "test_salesforce_to_adls_run_flow", + query="SELECT IsDeleted, FiscalYear FROM Opportunity LIMIT 50", + adls_sp_credentials_secret=credentials_secret, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_list_to_match": ["IsDeleted", "FiscalYear"]}, + ) + + result = flow.run() + assert result.is_successful() + + os.remove("test_salesforce_to_adls_run_flow.parquet") + os.remove("test_salesforce_to_adls_run_flow.json") + rm = AzureDataLakeRemove( + path=ADLS_DIR_PATH + ADLS_FILE_NAME, + vault_name="azuwevelcrkeyv001s", + ) + rm.run(sp_credentials_secret=credentials_secret) diff --git a/tests/integration/flows/test_sap_bw_to_adls.py b/tests/integration/flows/test_sap_bw_to_adls.py index c337336de..2c01049e8 100644 --- a/tests/integration/flows/test_sap_bw_to_adls.py +++ b/tests/integration/flows/test_sap_bw_to_adls.py @@ -5,6 +5,7 @@ import pytest from viadot.flows import SAPBWToADLS +from viadot.exceptions import ValidationError DATA = { "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], @@ -51,3 +52,74 @@ def test_sap_bw_to_adls_flow_run(mocked_class): assert result.is_successful() os.remove("test_sap_bw_to_adls_flow_run.parquet") os.remove("test_sap_bw_to_adls_flow_run.json") + + +def test_sap_bw_validate_df_task_success(caplog): + flow = SAPBWToADLS( + "test_sap_bw_validate_df_task_success", + mdx_query=""" + SELECT + { [Measures].[003YPR4ERQZRMSMFEQ8123HRR] } ON COLUMNS, + NON EMPTY + { + { [0CALDAY].[20230102] } * + { + [0COMP_CODE].[1120] + } + } + ON ROWS + FROM ZCSALBIL1/ZBW4_ZCSALBIL1_002_BOA + """, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={ + "column_size": {"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]": 10}, + "column_list_to_match": [ + "[0CALDAY].[LEVEL01].[MEMBER_CAPTION]", + "[0COMP_CODE].[LEVEL01].[MEMBER_CAPTION]", + "[Measures].[003YPR4ERQZRMSMFEQ8123HRR]", + ], + }, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_sap_bw_validate_df_task_success.parquet") + os.remove("test_sap_bw_validate_df_task_success.json") + + +def test_sap_bw_validate_df_task_fail(caplog): + flow = SAPBWToADLS( + "test_sap_bw_validate_df_task_fail", + mdx_query=""" + SELECT + { [Measures].[003YPR4ERQZRMSMFEQ8123HRR] } ON COLUMNS, + NON EMPTY + { + { [0CALDAY].[20230102] } * + { + [0COMP_CODE].[1120] + } + } + ON ROWS + FROM ZCSALBIL1/ZBW4_ZCSALBIL1_002_BOA + """, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={ + "column_size": {"[0CALDAY].[LEVEL01].[MEMBER_CAPTION]": 8}, + "column_list_to_match": [ + "[0CALDAY].[LEVEL01].[MEMBER_CAPTION]", + "[0COMP_CODE].[LEVEL01].[MEMBER_CAPTION]", + ], + }, + ) + + try: + flow.run() + except ValidationError: + pass + + os.remove("test_sap_bw_validate_df_task_fail.parquet") + os.remove("test_sap_bw_validate_df_task_fail.json") diff --git a/tests/integration/flows/test_sap_rfc_to_adls.py b/tests/integration/flows/test_sap_rfc_to_adls.py index 5b9f0d556..ed33fa320 100644 --- a/tests/integration/flows/test_sap_rfc_to_adls.py +++ b/tests/integration/flows/test_sap_rfc_to_adls.py @@ -2,6 +2,7 @@ from viadot.flows import SAPRFCToADLS from viadot.sources import AzureDataLake from viadot.tasks import AzureDataLakeRemove +from viadot.exceptions import ValidationError try: import pyrfc @@ -10,15 +11,15 @@ ADLS_PATH = "raw/supermetrics/mp/test_file_sap.parquet" FILE_NAME = "test_file.parquet" +SAP_TEST_CREDS = local_config.get("SAP").get("QA") def test_sap_rfc_to_adls_query(): - sap_test_creds = local_config.get("SAP").get("QA") flow = SAPRFCToADLS( name="test flow", - query="SELECT MATNR, MATKL FROM MARA WHERE LAEDA LIKE '2022%'", + query="SELECT MATNR, MATKL FROM MARA WHERE LAEDA LIKE '2022%' LIMIT 5", func="BBP_RFC_READ_TABLE", - sap_credentials=sap_test_creds, + sap_credentials=SAP_TEST_CREDS, local_file_path=FILE_NAME, adls_path=ADLS_PATH, overwrite=True, @@ -27,4 +28,41 @@ def test_sap_rfc_to_adls_query(): assert result.is_successful() file = AzureDataLake(ADLS_PATH) assert file.exists() + assert len(flow.tasks) == 3 + AzureDataLakeRemove(ADLS_PATH) + + +def test_sap_rfc_to_adls_validation_fail(): + flow = SAPRFCToADLS( + name="test flow", + query="SELECT MATNR, MATKL FROM MARA WHERE LAEDA LIKE '2022%' LIMIT 5", + func="BBP_RFC_READ_TABLE", + sap_credentials=SAP_TEST_CREDS, + local_file_path=FILE_NAME, + adls_path=ADLS_PATH, + overwrite=True, + validate_df_dict={"column_list_to_match": ["MATNR"]}, + ) + try: + result = flow.run() + except ValidationError: + pass + + +def test_sap_rfc_to_adls_validation_success(): + flow = SAPRFCToADLS( + name="test flow", + query="SELECT MATNR, MATKL FROM MARA WHERE LAEDA LIKE '2022%' LIMIT 5", + func="BBP_RFC_READ_TABLE", + sap_credentials=SAP_TEST_CREDS, + local_file_path=FILE_NAME, + adls_path=ADLS_PATH, + overwrite=True, + validate_df_dict={"column_list_to_match": ["MATNR", "MATKL"]}, + ) + result = flow.run() + assert result.is_successful() + file = AzureDataLake(ADLS_PATH) + assert file.exists() + assert len(flow.tasks) == 4 AzureDataLakeRemove(ADLS_PATH) diff --git a/tests/integration/flows/test_supermetrics_to_adls.py b/tests/integration/flows/test_supermetrics_to_adls.py index d901ad7ad..9738ddeb1 100644 --- a/tests/integration/flows/test_supermetrics_to_adls.py +++ b/tests/integration/flows/test_supermetrics_to_adls.py @@ -5,9 +5,10 @@ from prefect.storage import Local from viadot.flows import SupermetricsToADLS +from viadot.exceptions import ValidationError CWD = os.getcwd() -adls_dir_path = "raw/supermetrics" +adls_dir_path = "raw/tests/supermetrics" STORAGE = Local(path=CWD) logger = logging.getLogger(__name__) @@ -110,3 +111,91 @@ def test_supermetrics_to_adls_file_name(expectation_suite): ) result = flow.run() assert result.is_successful() + + +def test_supermetrics_to_adls_validate_df_success(expectation_suite): + flow = SupermetricsToADLS( + "test_supermetrics_to_adls_validate_df_success", + ds_id="GA", + ds_segments=[ + "R1fbzFNQQ3q_GYvdpRr42w", + "I8lnFFvdSFKc50lP7mBKNA", + "Lg7jR0VWS5OqGPARtGYKrw", + "h8ViuGLfRX-cCL4XKk6yfQ", + "-1", + ], + ds_accounts=["8326007", "58338899"], + date_range_type="last_year_inc", + fields=[ + {"id": "Date"}, + {"id": "segment", "split": "column"}, + {"id": "AvgPageLoadTime_calc"}, + ], + settings={"avoid_sampling": "true"}, + order_columns="alphabetic", + max_columns=100, + max_rows=10, + expectation_suite=expectation_suite, + evaluation_parameters=dict(previous_run_row_count=9), + adls_dir_path=adls_dir_path, + parallel=False, + validate_df_dict={ + "column_size": {"Date": 10}, + "column_list_to_match": [ + "Date", + "All Users", + "M-Site_Better Space: All Landing Page Sessions", + "M-site_Accessories: All Landing Page Sessions", + "M-site_More Space: All Landing Page Sessions", + "M-site_Replacement: All Landing Page Sessions", + ], + }, + ) + result = flow.run() + assert result.is_successful() + + task_results = result.result.values() + assert all([task_result.is_successful() for task_result in task_results]) + + +def test_supermetrics_to_adls_validate_df_fail(expectation_suite): + flow = SupermetricsToADLS( + "test_supermetrics_to_adls_validate_df_fail", + ds_id="GA", + ds_segments=[ + "R1fbzFNQQ3q_GYvdpRr42w", + "I8lnFFvdSFKc50lP7mBKNA", + "Lg7jR0VWS5OqGPARtGYKrw", + "h8ViuGLfRX-cCL4XKk6yfQ", + "-1", + ], + ds_accounts=["8326007", "58338899"], + date_range_type="last_year_inc", + fields=[ + {"id": "Date"}, + {"id": "segment", "split": "column"}, + {"id": "AvgPageLoadTime_calc"}, + ], + settings={"avoid_sampling": "true"}, + order_columns="alphabetic", + max_columns=100, + max_rows=10, + expectation_suite=expectation_suite, + evaluation_parameters=dict(previous_run_row_count=9), + adls_dir_path=adls_dir_path, + parallel=False, + validate_df_dict={ + "column_list_to_match": [ + "All Users", + "All Landing Page Sessions", + "All Landing Page Sessions", + "All Landing Page Sessions", + "All Landing Page Sessions", + ], + }, + ) + + try: + flow.run() + except ValidationError: + pass diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py index a0c86c2ec..c18eaad10 100644 --- a/tests/integration/flows/test_vidclub_to_adls.py +++ b/tests/integration/flows/test_vidclub_to_adls.py @@ -5,9 +5,11 @@ import pytest from viadot.flows import VidClubToADLS +from viadot.exceptions import ValidationError DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} ADLS_FILE_NAME = "test_vid_club.parquet" +ADLS_FILE_NAME2 = "test_vid_club_validate_df.parquet" ADLS_DIR_PATH = "raw/test/" @@ -29,3 +31,62 @@ def test_vidclub_to_adls_run_flow(mocked_class): assert result.is_successful() os.remove("test_vidclub_to_adls_flow_run.parquet") os.remove("test_vidclub_to_adls_flow_run.json") + + +def test_vidclub_validate_df_task_success(caplog): + flow = VidClubToADLS( + "test_vidclub_validate_df_task_success", + source="product", + cols_to_drop=[ + "submissionProductID", + "submissionProductDate", + "brand", + "productCode", + ], + from_date="2023-10-25", + to_date="2023-10-25", + adls_dir_path="raw/tests", + adls_file_name="test.parquet", + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + overwrite_adls=True, + validate_df_dict={ + "column_size": {"submissionID": 5}, + "column_list_to_match": [ + "submissionID", + "regionID", + "productQuantity", + "unit", + ], + }, + ) + + result = flow.run() + assert result.is_successful() + + +def test_vidclub_validate_df_task_fail(caplog): + flow = VidClubToADLS( + "test_vidclub_validate_df_task_fail", + source="product", + cols_to_drop=[ + "submissionProductID", + "submissionProductDate", + "brand", + "productCode", + ], + from_date="2023-10-25", + to_date="2023-10-25", + adls_dir_path="raw/tests", + adls_file_name="test.parquet", + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + overwrite_adls=True, + validate_df_dict={ + "column_size": {"submissionID": 5}, + "column_unique_values": ["regionID"], + }, + ) + + try: + flow.run() + except ValidationError: + pass diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index c784fa682..502ffded0 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -1,6 +1,8 @@ import os +import re import pandas as pd +from copy import deepcopy import pytest from prefect.tasks.secrets import PrefectSecret @@ -9,6 +11,7 @@ from viadot.sources import Sharepoint from viadot.task_utils import df_get_data_types_task from viadot.tasks.sharepoint import SharepointToDF +from viadot.sources import SharepointList def get_url() -> str: @@ -18,7 +21,7 @@ def get_url() -> str: Returns: str: File URL. """ - return local_config["SHAREPOINT"].get("url") + return local_config["SHAREPOINT"].get("file_url") @pytest.fixture(scope="session") @@ -163,3 +166,191 @@ def test_get_data_types(file_name): dtypes = dtypes_map.values() assert "String" in dtypes + + +@pytest.fixture(scope="session") +def sharepoint_list(): + """ + Fixture for creating a Sharepoint class instance. + The class instance can be used within a test functions to interact with Sharepoint. + """ + spl = SharepointList() + yield spl + + +def test_valid_filters(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "operator1": "<", "value1": 10}, + "filter2": {"dtype": "str", "operator1": "==", "value1": "value"}, + } + result = sharepoint_list.check_filters(filters) + assert result is True + + +def test_invalid_dtype(sharepoint_list): + filters = { + "filter1": {"dtype": "list", "operator1": ">", "value1": 10}, + } + with pytest.raises(ValueError, match="dtype not allowed!"): + sharepoint_list.check_filters(filters) + + +def test_missing_operator1(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "value1": 10}, + } + with pytest.raises(ValueError, match="Operator1 is missing!"): + sharepoint_list.check_filters(filters) + + +def test_invalid_operator1(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "operator1": "*", "value1": 10}, + } + with pytest.raises(ValueError, match="Operator type not allowed!"): + sharepoint_list.check_filters(filters) + + +def test_missing_value1(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "operator1": ">", "value1": None}, + } + with pytest.raises(ValueError, match="Value for operator1 is missing!"): + sharepoint_list.check_filters(filters) + + +def test_missing_operators_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "operator2": "<", + "value2": 20, + }, + } + with pytest.raises(ValueError, match="Operators for conjuction is missing!"): + sharepoint_list.check_filters(filters) + + +def test_invalid_operators_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "operator2": "<", + "value2": 20, + "operators_conjuction": "!", + }, + } + with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): + sharepoint_list.check_filters(filters) + + +def test_invalid_filters_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "filters_conjuction": "!", + }, + } + with pytest.raises( + ValueError, match="Filters operators for conjuction not allowed!" + ): + sharepoint_list.check_filters(filters) + + +def test_valid_mapping(sharepoint_list): + filters = { + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "|", + }, + "filter2": {"operator1": "==", "operator2": "!=", "operators_conjuction": "|"}, + } + expected_result = { + "filter1": { + "operator1": "gt", + "operator2": "le", + "operators_conjuction": "and", + "filters_conjuction": "or", + }, + "filter2": {"operator1": "eq", "operator2": "ne", "operators_conjuction": "or"}, + } + result = sharepoint_list.operators_mapping(deepcopy(filters)) + assert result == expected_result + + +def test_invalid_comparison_operator(sharepoint_list): + filters = { + "filter1": { + "operator1": "*", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "|", + }, + } + error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + + +def test_invalid_logical_operator(sharepoint_list): + filters = { + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "!", + "filters_conjuction": "|", + }, + } + error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + + +def test_single_filter_datetime_api(sharepoint_list): + filters = { + "date_column": {"dtype": "datetime", "operator1": ">", "value1": "2023-01-01"} + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "date_column gt datetime'2023-01-01T00:00:00' " + assert result == expected_result + + +def test_multiple_filters_api(sharepoint_list): + filters = { + "int_column": { + "dtype": "int", + "operator1": ">=", + "value1": 10, + "operator2": "<", + "value2": 20, + }, + "str_column": {"dtype": "str", "operator1": "==", "value1": "example"}, + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "int_column ge '10'int_column lt '20'str_column eq 'example'" + assert result == expected_result + + +def test_single_df_filter(sharepoint_list): + filters = {"column1": {"operator1": ">", "value1": 10}} + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')]" + assert result == expected_result + + +def test_multiple_df_filters(sharepoint_list): + filters = { + "column1": {"operator1": ">", "value1": 10, "filters_conjuction": "&"}, + "column2": {"operator1": "<", "value1": 20}, + } + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')&(df.column2 < '20')]" + assert result == expected_result diff --git a/tests/test_viadot.py b/tests/test_viadot.py index bffda072a..1f0874453 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.20" + assert __version__ == "0.4.21" diff --git a/tests/unit/test_task_utils.py b/tests/unit/test_task_utils.py index 1faee7071..e77c24fdd 100644 --- a/tests/unit/test_task_utils.py +++ b/tests/unit/test_task_utils.py @@ -2,11 +2,10 @@ from typing import List from unittest import mock -import numpy as np import pandas as pd -import prefect import pytest +from viadot.exceptions import ValidationError from viadot.task_utils import ( add_ingestion_metadata_task, adls_bulk_upload, @@ -21,6 +20,7 @@ dtypes_to_json_task, union_dfs_task, write_to_json, + validate_df, ) @@ -393,3 +393,99 @@ def test_wrong_method(): ) with pytest.raises(ValueError, match="Method not found"): anonymize_df.run(data, ["name", "last_name", "email"], method="anonymize") + + +def test_validate_df_column_size_pass(): + df = pd.DataFrame({"col1": ["a", "bb", "ccc"]}) + tests = {"column_size": {"col1": 3}} + try: + validate_df.run(df, tests) + except ValidationError: + assert False, "Validation failed but was expected to pass" + + +def test_validate_df_column_size_fail(): + df = pd.DataFrame({"col1": ["a", "bb", "cccc"]}) + tests = {"column_size": {"col1": 3}} + with pytest.raises(ValidationError): + validate_df.run(df, tests) + + +def test_validate_df_column_unique_values_pass(): + df = pd.DataFrame({"col1": [1, 2, 3]}) + tests = {"column_unique_values": ["col1"]} + try: + validate_df.run(df, tests) + except ValidationError: + assert False, "Validation failed but was expected to pass" + + +def test_validate_df_column_unique_values_fail(): + df = pd.DataFrame({"col1": [1, 2, 2]}) + tests = {"column_unique_values": ["col1"]} + with pytest.raises(ValidationError): + validate_df.run(df, tests) + + +def test_validate_df_column_list_to_match_pass(): + df = pd.DataFrame({"col1": [1], "col2": [2]}) + tests = {"column_list_to_match": ["col1", "col2"]} + try: + validate_df.run(df, tests) + except ValidationError: + assert False, "Validation failed but was expected to pass" + + +def test_validate_df_column_list_to_match_fail(): + df = pd.DataFrame({"col1": [1]}) + tests = {"column_list_to_match": ["col1", "col2"]} + with pytest.raises(ValidationError): + validate_df.run(df, tests) + + +def test_validate_df_dataset_row_count_pass(): + df = pd.DataFrame({"col1": [1, 2, 3]}) + tests = {"dataset_row_count": {"min": 1, "max": 5}} + try: + validate_df.run(df, tests) + except ValidationError: + assert False, "Validation failed but was expected to pass" + + +def test_validate_df_dataset_row_count_fail(): + df = pd.DataFrame({"col1": [1, 2, 3, 4, 5, 6]}) + tests = {"dataset_row_count": {"min": 1, "max": 5}} + with pytest.raises(ValidationError): + validate_df.run(df, tests) + + +def test_validate_df_column_match_regex_pass(): + df = pd.DataFrame({"col1": ["A12", "B34", "C45"]}) + tests = {"column_match_regex": {"col1": "^[A-Z][0-9]{2}$"}} + try: + validate_df.run(df, tests) + except ValidationError: + assert False, "Validation failed but was expected to pass" + + +def test_validate_df_column_match_regex_fail(): + df = pd.DataFrame({"col1": ["A123", "B34", "C45"]}) + tests = {"column_match_regex": {"col1": "^[A-Z][0-9]{2}$"}} + with pytest.raises(ValidationError): + validate_df.run(df, tests) + + +def test_validate_df_column_sum_pass(): + df = pd.DataFrame({"col1": [1, 2, 3]}) + tests = {"column_sum": {"col1": {"min": 5, "max": 10}}} + try: + validate_df.run(df, tests) + except ValidationError: + assert False, "Validation failed but was expected to pass" + + +def test_validate_df_column_sum_fail(): + df = pd.DataFrame({"col1": [1, 2, 3, 4]}) + tests = {"column_sum": {"col1": {"min": 5, "max": 6}}} + with pytest.raises(ValidationError): + validate_df.run(df, tests) diff --git a/viadot/__init__.py b/viadot/__init__.py index b4ed79e09..e427a5547 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.20" +__version__ = "0.4.21" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index de2f618ab..e138735d6 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -11,7 +11,7 @@ from .genesys_to_adls import GenesysToADLS from .outlook_to_adls import OutlookToADLS from .salesforce_to_adls import SalesforceToADLS -from .sharepoint_to_adls import SharepointToADLS +from .sharepoint_to_adls import SharepointToADLS, SharepointListToADLS from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL diff --git a/viadot/flows/aselite_to_adls.py b/viadot/flows/aselite_to_adls.py index 86e9b215b..61a91c963 100644 --- a/viadot/flows/aselite_to_adls.py +++ b/viadot/flows/aselite_to_adls.py @@ -2,7 +2,12 @@ from prefect import Flow -from viadot.task_utils import df_clean_column, df_converts_bytes_to_int, df_to_csv +from viadot.task_utils import ( + df_clean_column, + df_converts_bytes_to_int, + df_to_csv, + validate_df, +) from viadot.tasks import AzureDataLakeUpload from viadot.tasks.aselite import ASELiteToDF @@ -19,6 +24,7 @@ def __init__( to_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite: bool = True, + validate_df_dict: Dict[str, Any] = None, convert_bytes: bool = False, sp_credentials_secret: str = None, remove_special_characters: bool = None, @@ -41,6 +47,8 @@ def __init__( to_path (str): The path to an ADLS file. Defaults to None. if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite (str, optional): Whether to overwrite the destination file. Defaults to True. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. remove_special_characters (str, optional): Call a function that remove special characters like escape symbols. Defaults to None. @@ -53,6 +61,7 @@ def __init__( self.sqldb_credentials_secret = sqldb_credentials_secret self.vault_name = vault_name self.overwrite = overwrite + self.validate_df_dict = validate_df_dict self.file_path = file_path self.sep = sep @@ -83,6 +92,12 @@ def gen_flow(self) -> Flow: if self.remove_special_characters == True: df = df_clean_column(df, columns_to_clean=self.columns_to_clean, flow=self) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + create_csv = df_to_csv.bind( df, path=self.file_path, @@ -100,5 +115,8 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + create_csv.set_upstream(validation_task, flow=self) + create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self) diff --git a/viadot/flows/bigquery_to_adls.py b/viadot/flows/bigquery_to_adls.py index 8e8095b5a..e09981ebe 100644 --- a/viadot/flows/bigquery_to_adls.py +++ b/viadot/flows/bigquery_to_adls.py @@ -15,6 +15,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, BigQueryToDF @@ -40,6 +41,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any], @@ -78,6 +80,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (dict, optional): An optional dictionary to verify the received dataframe. + When passed, `validate_df` task validation tests are triggered. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -91,6 +95,9 @@ def __init__( self.vault_name = vault_name self.credentials_key = credentials_key + # Validate DataFrame + self.validate_df_dict = validate_df_dict + # AzureDataLakeUpload self.overwrite = overwrite_adls self.adls_sp_credentials_secret = adls_sp_credentials_secret @@ -140,6 +147,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) df_to_be_loaded = df_map_mixed_dtypes_for_parquet( @@ -184,6 +197,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_with_metadata.set_upstream(df, flow=self) dtypes_dict.set_upstream(df_with_metadata, flow=self) df_to_be_loaded.set_upstream(dtypes_dict, flow=self) diff --git a/viadot/flows/cloud_for_customers_report_to_adls.py b/viadot/flows/cloud_for_customers_report_to_adls.py index 332698060..386a7224e 100644 --- a/viadot/flows/cloud_for_customers_report_to_adls.py +++ b/viadot/flows/cloud_for_customers_report_to_adls.py @@ -9,6 +9,7 @@ df_to_csv, df_to_parquet, union_dfs_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, C4CReportToDF, C4CToDF from viadot.utils import slugify @@ -38,6 +39,7 @@ def __init__( adls_sp_credentials_secret: str = None, if_empty: str = "warn", if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -49,16 +51,16 @@ def __init__( Args: name (str): The name of the flow. report_url (str, optional): The url to the API. Defaults to None. - url (str, optional): ??? - endpoint (str, optional): ??? - params (dict, optional): ??? - fields (list, optional): ??? + url (str, optional): The url to the C4C API. Defaults to None. + endpoint (str, optional): The C4C API endpoint. Defaults to None. + params (dict, optional): The query parameters like filter by creation date time. Defaults to None. + fields (list, optional): List of columns to put in DataFrame. Defaults to None. skip (int, optional): Initial index value of reading row. Defaults to 0. top (int, optional): The value of top reading row. Defaults to 1000. channels (List[str], optional): Filtering parameters passed to the url. Defaults to None. months (List[str], optional): Filtering parameters passed to the url. Defaults to None. years (List[str], optional): Filtering parameters passed to the url. Defaults to None. - env (str, optional): ??? + env (str, optional): The credentials environments. Defaults to 'QA'. c4c_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with username and password for the Cloud for Customers instance. local_file_path (str, optional): Local destination path. Defaults to None. @@ -72,6 +74,8 @@ def __init__( Defaults to None. if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn". if_exists (str, optional): What to do if the local file already exists. Defaults to "replace". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -82,6 +86,7 @@ def __init__( self.if_empty = if_empty self.env = env self.c4c_credentials_secret = c4c_credentials_secret + self.validate_df_dict = validate_df_dict self.timeout = timeout # AzureDataLakeUpload @@ -197,6 +202,10 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation_task.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) if self.output_file_extension == ".parquet": @@ -223,6 +232,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_with_metadata.set_upstream(df, flow=self) df_to_file.set_upstream(df_with_metadata, flow=self) file_to_adls_task.set_upstream(df_to_file, flow=self) diff --git a/viadot/flows/customer_gauge_to_adls.py b/viadot/flows/customer_gauge_to_adls.py index 8053aeda3..39225330c 100644 --- a/viadot/flows/customer_gauge_to_adls.py +++ b/viadot/flows/customer_gauge_to_adls.py @@ -17,6 +17,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, CustomerGaugeToDF @@ -52,6 +53,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any] @@ -92,6 +94,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. + If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ # CustomerGaugeToDF @@ -105,6 +109,9 @@ def __init__( self.end_date = end_date self.customer_gauge_credentials_secret = customer_gauge_credentials_secret + # validate_df + self.validate_df_dict = validate_df_dict + # anonymize_df self.anonymize = anonymize self.columns_to_anonymize = columns_to_anonymize @@ -169,6 +176,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + customerg_df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(customerg_df, flow=self) + if self.anonymize == True: anonymized_df = anonymize_df.bind( customerg_df, @@ -228,6 +241,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/eurostat_to_adls.py b/viadot/flows/eurostat_to_adls.py index 0348d7de4..08a8677d6 100644 --- a/viadot/flows/eurostat_to_adls.py +++ b/viadot/flows/eurostat_to_adls.py @@ -15,6 +15,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from ..tasks import AzureDataLakeUpload, EurostatToDF @@ -40,6 +41,7 @@ def __init__( adls_file_name: str = None, adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, + validate_df_dict: dict = None, if_exists: str = "replace", *args: List[Any], **kwargs: Dict[str, Any], @@ -70,6 +72,8 @@ def __init__( ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. + If defined, triggers the `validate_df` task from task_utils. Defaults to None if_exists (str, optional): What to do if the file exists. Defaults to "replace". """ @@ -79,6 +83,9 @@ def __init__( self.base_url = base_url self.requested_columns = requested_columns + # validate df + self.validate_df_dict = validate_df_dict + # AzureDataLakeUpload self.overwrite = overwrite_adls self.adls_sp_credentials_secret = adls_sp_credentials_secret @@ -123,6 +130,12 @@ def gen_flow(self) -> Flow: df = df.bind(flow=self) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) @@ -165,6 +178,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/genesys_to_adls.py b/viadot/flows/genesys_to_adls.py index 4f8c54f3e..830c02c71 100644 --- a/viadot/flows/genesys_to_adls.py +++ b/viadot/flows/genesys_to_adls.py @@ -5,7 +5,7 @@ import pandas as pd from prefect import Flow, task -from viadot.task_utils import add_ingestion_metadata_task, adls_bulk_upload +from viadot.task_utils import add_ingestion_metadata_task, adls_bulk_upload, validate_df from viadot.tasks.genesys import GenesysToCSV @@ -95,6 +95,7 @@ def __init__( overwrite_adls: bool = True, adls_sp_credentials_secret: str = None, credentials_genesys: Dict[str, Any] = None, + validate_df_dict: Dict[str, Any] = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -143,6 +144,8 @@ def __init__( adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. credentials(dict, optional): Credentials for the genesys api. Defaults to None. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -165,6 +168,7 @@ def __init__( self.start_date = start_date self.end_date = end_date self.sep = sep + self.validate_df_dict = validate_df_dict self.timeout = timeout # AzureDataLake @@ -183,6 +187,7 @@ def gen_flow(self) -> Flow: timeout=self.timeout, local_file_path=self.local_file_path, sep=self.sep, + validate_df_dict=self.validate_df_dict, ) file_names = to_csv.bind( diff --git a/viadot/flows/hubspot_to_adls.py b/viadot/flows/hubspot_to_adls.py index 8f79d5d1b..87f5e2504 100644 --- a/viadot/flows/hubspot_to_adls.py +++ b/viadot/flows/hubspot_to_adls.py @@ -13,6 +13,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, HubspotToDF @@ -33,6 +34,7 @@ def __init__( adls_dir_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite_adls: bool = True, + validate_df_dict: Dict[str, Any] = None, vault_name: str = None, sp_credentials_secret: str = None, *args: List[any], @@ -75,6 +77,8 @@ def __init__( output_file_extension (str, optional): Output file extension. Defaults to ".parquet". if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite_adls (str, optional): Whether to overwrite the destination file in ADLS. Defaults to True. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. """ @@ -86,6 +90,7 @@ def __init__( self.hubspot_credentials = hubspot_credentials self.hubspot_credentials_key = hubspot_credentials_key self.output_file_extension = output_file_extension + self.validate_df_dict = validate_df_dict self.local_file_path = ( local_file_path or self.slugify(name) + self.output_file_extension @@ -137,6 +142,12 @@ def gen_flow(self) -> Flow: df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_to_be_loaded = df_map_mixed_dtypes_for_parquet( df_viadot_downloaded, dtypes_dict, flow=self ) @@ -172,6 +183,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_viadot_downloaded.set_upstream(validation_task, flow=self) + df_viadot_downloaded.set_upstream(df, flow=self) dtypes_dict.set_upstream(df_viadot_downloaded, flow=self) df_to_be_loaded.set_upstream(dtypes_dict, flow=self) diff --git a/viadot/flows/mediatool_to_adls.py b/viadot/flows/mediatool_to_adls.py index f87a6293b..156cfeef5 100644 --- a/viadot/flows/mediatool_to_adls.py +++ b/viadot/flows/mediatool_to_adls.py @@ -16,6 +16,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from ..tasks import AzureDataLakeUpload, MediatoolToDF @@ -41,6 +42,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", + validate_df_dict: Dict[str, Any] = None, *args: List[Any], **kwargs: Dict[str, Any], ): @@ -66,6 +68,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. """ # MediatoolToDF self.organization_ids = organization_ids @@ -73,6 +77,7 @@ def __init__( self.mediatool_credentials_key = mediatool_credentials_key self.media_entries_columns = media_entries_columns self.vault_name = vault_name + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.overwrite = overwrite_adls @@ -119,6 +124,11 @@ def gen_flow(self) -> Flow: media_entries_columns=self.media_entries_columns, flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) df_casted_to_str = cast_df_to_str(df_with_metadata, flow=self) @@ -163,6 +173,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/mysql_to_adls.py b/viadot/flows/mysql_to_adls.py index 4c18148fe..c8c4c9c44 100644 --- a/viadot/flows/mysql_to_adls.py +++ b/viadot/flows/mysql_to_adls.py @@ -2,7 +2,7 @@ from prefect import Flow -from viadot.task_utils import df_to_csv +from viadot.task_utils import df_to_csv, validate_df from viadot.tasks import AzureDataLakeUpload from viadot.tasks.mysql_to_df import MySqlToDf @@ -20,6 +20,7 @@ def __init__( to_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite_adls: bool = True, + validate_df_dict: dict = None, sp_credentials_secret: str = None, credentials_secret: str = None, timeout: int = 3600, @@ -41,6 +42,8 @@ def __init__( to_path (str): The path to an ADLS file. Defaults to None. if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite_adls (str, optional): Whether to overwrite_adls the destination file. Defaults to True. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. + If defined, triggers the `validate_df` task from task_utils. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. credentials_secret (str, optional): Key Vault name. Defaults to None. @@ -57,6 +60,9 @@ def __init__( self.vault_name = vault_name self.overwrite_adls = overwrite_adls + # validate df + self.validate_df_dict = validate_df_dict + # Upload to ADLS self.file_path = file_path self.sep = sep @@ -76,6 +82,12 @@ def gen_flow(self) -> Flow: credentials_secret=self.credentials_secret, query=self.query, flow=self ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + create_csv = df_to_csv.bind( df, path=self.file_path, @@ -93,5 +105,8 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + create_csv.set_upstream(validation_task, flow=self) + create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self) diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index 3f24a65d8..dfeef1302 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -9,6 +9,7 @@ df_to_csv, df_to_parquet, union_dfs_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, OutlookToDF @@ -29,6 +30,7 @@ def __init__( limit: int = 10000, timeout: int = 3600, if_exists: Literal["append", "replace", "skip"] = "append", + validate_df_dict: dict = None, outlook_credentials_secret: str = "OUTLOOK", *args: List[Any], **kwargs: Dict[str, Any], @@ -54,6 +56,8 @@ def __init__( timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append". + validate_df_dict (dict, optional): An optional dictionary to verify the received dataframe. + When passed, `validate_df` task validation tests are triggered. Defaults to None. """ self.mailbox_list = mailbox_list @@ -65,6 +69,9 @@ def __init__( self.local_file_path = local_file_path self.if_exsists = if_exists + # Validate DataFrame + self.validate_df_dict = validate_df_dict + # AzureDataLakeUpload self.adls_file_path = adls_file_path self.output_file_extension = output_file_extension @@ -98,6 +105,13 @@ def gen_flow(self) -> Flow: dfs = apply_map(self.gen_outlook_df, self.mailbox_list, flow=self) df = union_dfs_task.bind(dfs, flow=self) + + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) if self.output_file_extension == ".parquet": @@ -124,6 +138,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_with_metadata.set_upstream(df, flow=self) df_to_file.set_upstream(df_with_metadata, flow=self) file_to_adls_task.set_upstream(df_to_file, flow=self) diff --git a/viadot/flows/salesforce_to_adls.py b/viadot/flows/salesforce_to_adls.py index fe84be381..11dec54fd 100644 --- a/viadot/flows/salesforce_to_adls.py +++ b/viadot/flows/salesforce_to_adls.py @@ -16,6 +16,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, SalesforceToDF @@ -41,6 +42,7 @@ def __init__( adls_file_name: str = None, adls_sp_credentials_secret: str = None, if_exists: str = "replace", + validate_df_dict: Dict[str, Any] = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any], @@ -70,6 +72,8 @@ def __init__( ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -82,6 +86,7 @@ def __init__( self.env = env self.vault_name = vault_name self.credentials_secret = credentials_secret + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.adls_sp_credentials_secret = adls_sp_credentials_secret @@ -135,6 +140,13 @@ def gen_flow(self) -> Flow: df_clean = df_clean_column.bind(df=df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df_clean, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_to_be_loaded = df_map_mixed_dtypes_for_parquet( df_with_metadata, dtypes_dict, flow=self ) @@ -177,6 +189,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_clean.set_upstream(validation_task, flow=self) + df_clean.set_upstream(df, flow=self) df_with_metadata.set_upstream(df_clean, flow=self) dtypes_dict.set_upstream(df_with_metadata, flow=self) diff --git a/viadot/flows/sap_bw_to_adls.py b/viadot/flows/sap_bw_to_adls.py index 375f30775..90b965f92 100644 --- a/viadot/flows/sap_bw_to_adls.py +++ b/viadot/flows/sap_bw_to_adls.py @@ -14,6 +14,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, SAPBWToDF @@ -35,6 +36,7 @@ def __init__( overwrite_adls: bool = True, vault_name: str = None, sp_credentials_secret: str = None, + validate_df_dict: dict = None, *args: List[any], **kwargs: Dict[str, Any], ): @@ -56,6 +58,7 @@ def __init__( overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True. vault_name (str, optional): The name of the vault from which to obtain the secrets.. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. """ self.sapbw_credentials = sapbw_credentials self.sapbw_credentials_key = sapbw_credentials_key @@ -89,6 +92,7 @@ def __init__( self.overwrite_adls = overwrite_adls self.vault_name = vault_name self.sp_credentials_secret = sp_credentials_secret + self.validate_df_dict = validate_df_dict super().__init__(*args, name=name, **kwargs) self.gen_flow() @@ -110,6 +114,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self) @@ -156,6 +166,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_viadot_downloaded.set_upstream(validation_task, flow=self) + df_viadot_downloaded.set_upstream(df, flow=self) dtypes_dict.set_upstream(df_viadot_downloaded, flow=self) df_to_be_loaded.set_upstream(dtypes_dict, flow=self) diff --git a/viadot/flows/sap_rfc_to_adls.py b/viadot/flows/sap_rfc_to_adls.py index 0d56efeec..d7a2ac390 100644 --- a/viadot/flows/sap_rfc_to_adls.py +++ b/viadot/flows/sap_rfc_to_adls.py @@ -1,8 +1,8 @@ from typing import Any, Dict, List, Literal -from prefect import Flow, task, unmapped +from prefect import Flow -from viadot.task_utils import concat_dfs, df_to_csv, df_to_parquet, set_new_kv +from viadot.task_utils import df_to_csv, df_to_parquet, set_new_kv, validate_df from viadot.tasks import AzureDataLakeUpload, SAPRFCToDF @@ -27,8 +27,9 @@ def __init__( vault_name: str = None, update_kv: bool = False, filter_column: str = None, - timeout: int = 3600, alternative_version: bool = False, + validate_df_dict: Dict[str, Any] = None, + timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], ): @@ -77,9 +78,11 @@ def __init__( vault_name(str, optional): The name of the vault from which to obtain the secrets. Defaults to None. update_kv (bool, optional): Whether or not to update key value on Prefect. Defaults to False. filter_column (str, optional): Name of the field based on which key value will be updated. Defaults to None. + alternative_version (bool, optional): Enable the use version 2 in source. Defaults to False. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. - alternative_version (bool, optional): Enable the use version 2 in source. Defaults to False. """ self.query = query self.rfc_sep = rfc_sep @@ -96,8 +99,9 @@ def __init__( self.overwrite = overwrite self.adls_sp_credentials_secret = adls_sp_credentials_secret self.vault_name = vault_name - self.timeout = timeout self.alternative_version = alternative_version + self.validate_df_dict = validate_df_dict + self.timeout = timeout self.update_kv = update_kv self.filter_column = filter_column @@ -119,6 +123,11 @@ def gen_flow(self) -> Flow: credentials=self.sap_credentials, flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) if self.output_file_extension == ".parquet": df_to_file = df_to_parquet.bind( @@ -145,6 +154,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_to_file.set_upstream(validation_task, flow=self) + df_to_file.set_upstream(df, flow=self) adls_upload.set_upstream(df_to_file, flow=self) diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index df5562221..eaf747bab 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -3,12 +3,10 @@ from typing import Any, Dict, List import pendulum -from prefect import Flow, task +from prefect import Flow from prefect.backend import set_key_value from prefect.utilities import logging -logger = logging.get_logger() - from viadot.task_utils import ( add_ingestion_metadata_task, df_get_data_types_task, @@ -16,9 +14,13 @@ df_to_csv, df_to_parquet, dtypes_to_json_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload -from viadot.tasks.sharepoint import SharepointToDF +from viadot.tasks.sharepoint import SharepointToDF, SharepointListToDF + + +logger = logging.get_logger() class SharepointToADLS(Flow): @@ -38,6 +40,7 @@ def __init__( overwrite_adls: bool = False, if_empty: str = "warn", if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -62,6 +65,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_empty (str, optional): What to do if query returns no data. Defaults to "warn". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -74,6 +79,7 @@ def __init__( self.sheet_number = sheet_number self.validate_excel_file = validate_excel_file self.timeout = timeout + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.overwrite = overwrite_adls @@ -117,6 +123,10 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation_task.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) df_mapped = df_map_mixed_dtypes_for_parquet.bind( @@ -158,6 +168,197 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json_task.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json_task, flow=self) + + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() + + +class SharepointListToADLS(Flow): + def __init__( + self, + name: str, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + sp_cert_credentials_secret: str = None, + vault_name: str = None, + path: str = None, + adls_dir_path: str = None, + adls_file_name: str = None, + adls_sp_credentials_secret: str = None, + overwrite_adls: bool = True, + output_file_extension: str = ".parquet", + validate_df_dict: dict = None, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Run Flow SharepointListToADLS. + + Args: + name (str): Prefect flow name. + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', # conjuction operators allowed only when 2 values passed + 'filters_conjuction':'&', # conjuction filters allowed only when 2 columns passed + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + sp_cert_credentials_secret (str): Credentials to verify Sharepoint connection. Default to None. + vault_name (str): KeyVaultSecret name. Default to None. + path (str): Local file path. Default to None. + adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_file_name (str, optional): Name of file in ADLS. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, + CLIENT_SECRET) for the Azure Data Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + + Returns: + .parquet file inside ADLS. + """ + + # SharepointListToDF + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.sp_cert_credentials_secret = sp_cert_credentials_secret + self.vault_name = vault_name + self.row_count = row_count + self.validate_df_dict = validate_df_dict + + # AzureDataLakeUpload + self.adls_dir_path = adls_dir_path + self.adls_file_name = adls_file_name + self.overwrite = overwrite_adls + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.output_file_extension = output_file_extension + self.now = str(pendulum.now("utc")) + if self.path is not None: + self.local_file_path = ( + self.path + self.slugify(name) + self.output_file_extension + ) + else: + self.local_file_path = self.slugify(name) + self.output_file_extension + self.local_json_path = self.slugify(name) + ".json" + self.adls_dir_path = adls_dir_path + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + + super().__init__( + name=name, + *args, + **kwargs, + ) + + self.gen_flow() + + def gen_flow(self) -> Flow: + s = SharepointListToDF( + path=self.path, + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + field_property=self.field_property, + filters=self.filters, + row_count=self.row_count, + credentials_secret=self.sp_cert_credentials_secret, + ) + df = s.run() + + if self.validate_df_dict: + validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation_task.set_upstream(df, flow=self) + + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) + + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.path, + flow=self, + ) + + file_to_adls_task = AzureDataLakeUpload() + file_to_adls_task.bind( + from_path=self.path, + to_path=self.adls_dir_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_to_json_task.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) + + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + df_mapped.set_upstream(df_with_metadata, flow=self) dtypes_to_json_task.set_upstream(df_mapped, flow=self) df_to_file.set_upstream(dtypes_to_json_task, flow=self) diff --git a/viadot/flows/supermetrics_to_adls.py b/viadot/flows/supermetrics_to_adls.py index 38255a38f..80253eb88 100644 --- a/viadot/flows/supermetrics_to_adls.py +++ b/viadot/flows/supermetrics_to_adls.py @@ -19,6 +19,7 @@ union_dfs_task, update_dtypes_dict, write_to_json, + validate_df, ) from viadot.tasks import ( AzureDataLakeUpload, @@ -68,6 +69,7 @@ def __init__( vault_name: str = None, check_missing_data: bool = True, timeout: int = 3600, + validate_df_dict: dict = None, *args: List[any], **kwargs: Dict[str, Any], ): @@ -112,6 +114,8 @@ def __init__( check_missing_data (bool, optional): Whether to check missing data. Defaults to True. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. + If defined, triggers the `validate_df` task from task_utils. Defaults to None. """ if not ds_user: try: @@ -140,6 +144,9 @@ def __init__( self.if_exists = if_exists self.output_file_extension = output_file_extension + # validate_df + self.validate_df_dict = validate_df_dict + # RunGreatExpectationsValidation self.expectation_suite = expectation_suite self.expectations_path = "/home/viadot/tmp/expectations" @@ -229,6 +236,13 @@ def gen_flow(self) -> Flow: else: df = self.gen_supermetrics_task(ds_accounts=self.ds_accounts, flow=self) + # run validate_df task from task_utils + if self.validate_df_dict: + validation_df_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_df_task.set_upstream(df, flow=self) + write_json = write_to_json.bind( dict_=self.expectation_suite, path=os.path.join( @@ -301,6 +315,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + write_json.set_upstream(df, flow=self) validation.set_upstream(write_json, flow=self) df_with_metadata.set_upstream(validation_upstream, flow=self) diff --git a/viadot/flows/vid_club_to_adls.py b/viadot/flows/vid_club_to_adls.py index 59d676c51..40f53d8ae 100644 --- a/viadot/flows/vid_club_to_adls.py +++ b/viadot/flows/vid_club_to_adls.py @@ -16,6 +16,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, VidClubToDF @@ -44,6 +45,7 @@ def __init__( adls_sp_credentials_secret: str = None, overwrite_adls: bool = False, if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any] @@ -75,6 +77,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ # VidClubToDF @@ -89,6 +93,9 @@ def __init__( self.vidclub_credentials_secret = vidclub_credentials_secret self.vidclub_vault_name = vidclub_vault_name + # validate_df + self.validate_df_dict = validate_df_dict + # AzureDataLakeUpload self.adls_file_name = adls_file_name self.adls_dir_path = adls_dir_path @@ -147,6 +154,12 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation_task = validate_df.bind( + vid_club_df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(vid_club_df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(vid_club_df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) @@ -191,6 +204,9 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 094cec14e..c0d96abe2 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -8,7 +8,7 @@ from .outlook import Outlook from .salesforce import Salesforce from .sftp import SftpConnector -from .sharepoint import Sharepoint +from .sharepoint import Sharepoint, SharepointList from .supermetrics import Supermetrics try: diff --git a/viadot/sources/cloud_for_customers.py b/viadot/sources/cloud_for_customers.py index 4856439cf..1bb68375e 100644 --- a/viadot/sources/cloud_for_customers.py +++ b/viadot/sources/cloud_for_customers.py @@ -35,8 +35,8 @@ def __init__( report_url (str, optional): The url to the API in case of prepared report. Defaults to None. url (str, optional): The url to the API. Defaults to None. endpoint (str, optional): The endpoint of the API. Defaults to None. - params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format. - env (str, optional): The development environments. Defaults to 'QA'. + params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to None. + env (str, optional): The credentials environments. Defaults to 'QA'. credentials (Dict[str, Any], optional): The credentials are populated with values from config file or this parameter. Defaults to None than use credentials from local_config. """ diff --git a/viadot/sources/sap_bw.py b/viadot/sources/sap_bw.py index 73b1d2efa..8f4fb0583 100644 --- a/viadot/sources/sap_bw.py +++ b/viadot/sources/sap_bw.py @@ -25,6 +25,7 @@ def __init__(self, credentials: dict, *args, **kwargs): Raises: CredentialError: If provided credentials are incorrect. """ + self.credentials = credentials if credentials is None: raise CredentialError("Missing credentials.") diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 61eda17f2..096de825b 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -1,10 +1,26 @@ -from typing import Any, Dict - -import sharepy - from ..config import local_config from ..exceptions import CredentialError from .base import Source +from viadot.utils import get_nested_dict + +from typing import Any, Dict, List +from fnmatch import fnmatch +from datetime import datetime +from copy import deepcopy +import pandas as pd + +import sharepy +from office365.runtime.auth.authentication_context import AuthenticationContext +from office365.sharepoint.client_context import ClientContext +from office365.runtime.client_request_exception import ClientRequestException +from prefect.utilities import logging + + +logger = logging.get_logger() + +# Print out how many rows was extracted in specific iteration +def log_of_progress(items): + logger.info("Items read: {0}".format(len(items))) class Sharepoint(Source): @@ -64,3 +80,475 @@ def download_file( url=download_from_path, filename=download_to_path, ) + + +class SharepointList(Source): + """ + A Sharepoint_List class to connect and download data from sharpoint lists. + + Args: + credentials (dict): Credentials should include: + - "tenant" + - "client_id" + - "scopes" + - "thumbprint" + - "private_key" + """ + + def __init__( + self, + credentials: Dict[str, Any] = None, + *args, + **kwargs, + ): + DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT_CERT") + credentials = credentials or DEFAULT_CREDENTIALS + if credentials is None: + raise CredentialError("Credentials not found.") + + super().__init__(*args, credentials=credentials, **kwargs) + + def get_connection( + self, + site_url: str = None, + ): + + # Connecting into Sharepoint with AuthenticationContext + try: + auth_context = AuthenticationContext(site_url) + auth_context.with_client_certificate( + tenant=self.credentials["TENANT"], + client_id=self.credentials["CLIENT_ID"], + scopes=[self.credentials["SCOPES"]], + thumbprint=self.credentials["THUMBPRINT"], + private_key=self.credentials["PRIVATE_KEY"], + ) + + self.ctx = ClientContext(site_url, auth_context) + logger.info("Successfully connect to Sharepoint Lists") + + except Exception as ex: + logger.info(f"Error at ClientContext or authentication level: {ex}") + + return self.ctx + + # Function for extracting list items from search fields + def _unpack_fields( + self, + list_item, + selected_fields: dict = None, + ): + + # Creating the body of dictionary + new_dict = dict() + + # For loop scanning the propertys of searching fields + item_values_dict = list_item.properties + for field, val in item_values_dict.items(): + nested_dict = get_nested_dict(val) + # Check if the dictionary is nested + if nested_dict != None: + # It might be that there are different field properties than expected + nested_value = nested_dict.get(selected_fields["FieldProperty"]) + if nested_value != None: + new_dict[field] = nested_value + else: + logger.info("I'm not the right value") + raise ValueError + else: + new_dict[field] = val + + return new_dict + + def get_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + ): + + ctx = self.get_connection(site_url=site_url) + + # Get list of lists object by List Title + self.list_object = ctx.web.lists.get_by_title(list_title) + list_fields_all = self.list_object.fields + + # Get all or specifics list of objects + if required_fields is None: + ctx.load(list_fields_all) + ctx.execute_query() + + return list_fields_all + + else: + list_fields_required = [ + list_fields_all.get_by_internal_name_or_title(field).get() + for field in required_fields + ] + ctx.execute_batch() + + return list_fields_required + + def select_expandable_user_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + ): + """ + Method to expand fields and get more informations. + field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. + -> more properties can be discovered by getting list.item.properties. + Default to "Title" + """ + + list_fields = self.get_fields( + list_title=list_title, site_url=site_url, required_fields=required_fields + ) + + # Finding the "selected" fields + fields_to_select = [ + field.properties["InternalName"] + f"/{field_property}" + if fnmatch(field.properties["TypeAsString"], "User*") + else field.properties["InternalName"] + for field in list_fields + ] + # Finding the "expanded" fields + fields_to_expand = [ + field.properties["InternalName"] + for field in list_fields + if fnmatch(field.properties["TypeAsString"], f"User*") + ] + + # Creating the body of the function output + selected_fields = { + "FieldInternalNames": fields_to_select, + "FieldToExpand": fields_to_expand, + "FieldProperty": field_property, + } + + return selected_fields + + def check_filters( + self, + filters: dict = None, + ) -> bool: + """ + Function to check if filters dict is valid. + example1: if operator2 is present value2 must be in place as well + example2: if dtype is not on allowed list it will throw an error + """ + + allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] + allowed_conjuction = ["&", "|"] + allowed_operators = ["<", ">", "<=", ">=", "==", "!="] + + for parameters in filters.values(): + if parameters.get("dtype") not in allowed_dtypes: + raise ValueError( + f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}." + ) + if parameters.get("operator1"): + if parameters.get("operator1") not in allowed_operators: + raise ValueError( + f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}." + ) + if not parameters.get("value1"): + raise ValueError("Value for operator1 is missing!") + elif not parameters.get("operator1"): + raise ValueError("Operator1 is missing!") + if ( + not parameters.get("operator2") + and parameters.get("operators_conjuction") is not None + ): + raise ValueError( + f"Operator conjuction allowed only with more than one filter operator!" + ) + if parameters.get("operator2"): + if parameters.get("operator2") not in allowed_operators: + raise ValueError( + f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}." + ) + if not parameters.get("value2"): + raise ValueError("Value for operator2 is missing!") + if not parameters.get("operators_conjuction"): + raise ValueError( + f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty." + ) + if parameters.get("operators_conjuction") not in allowed_conjuction: + raise ValueError( + f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}." + ) + if parameters.get("filters_conjuction"): + if ( + len(filters.keys()) == 1 + and parameters.get("filters_conjuction") is not None + ): + raise ValueError( + f"Filters conjuction allowed only with more than one filter column!" + ) + if parameters.get("filters_conjuction") not in allowed_conjuction: + raise ValueError( + f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}." + ) + + return True + + def operators_mapping( + self, + filters: dict = None, + ) -> dict: + """ + Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + New modified dict. + """ + + filters_dict = deepcopy(filters) + operators = { + "<": "lt", + ">": "gt", + "<=": "le", + ">=": "ge", + "==": "eq", + "!=": "ne", + } + logical_op = {"&": "and", "|": "or"} + + for parameters in filters_dict.values(): + if parameters.get("operator1"): + operator1_to_change = parameters.get("operator1") + if operator1_to_change in operators.keys(): + parameters["operator1"] = operators[operator1_to_change] + else: + raise ValueError( + f"This comparison operator: {operator1_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("operator2"): + operator2_to_change = parameters.get("operator2") + if operator2_to_change in operators.keys(): + parameters["operator2"] = operators[operator2_to_change] + else: + raise ValueError( + f"This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("operators_conjuction"): + logical_op_to_change = parameters.get("operators_conjuction") + if logical_op_to_change in logical_op.keys(): + parameters["operators_conjuction"] = logical_op[ + logical_op_to_change + ] + else: + raise ValueError( + f"This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("filters_conjuction"): + logical_fl_to_change = parameters.get("filters_conjuction") + if logical_fl_to_change in logical_op.keys(): + parameters["filters_conjuction"] = logical_op[logical_fl_to_change] + else: + raise ValueError( + f"This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" + ) + + return filters_dict + + def make_filter_for_api(self, filters: dict) -> "str": + """ + Function changing type of operators to match MS API style as 'str' passing to URL call. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter parameter to API. + """ + + filter_text = "" + filters_mod = self.operators_mapping(filters) + + for column, parameters in filters_mod.items(): + if parameters.get("dtype") in ["datetime", "date"]: + from_date1 = datetime.strptime( + parameters.get("value1"), "%Y-%m-%d" + ).isoformat() + filter_text = ( + filter_text + + f"{column} {parameters.get('operator1')} datetime'{from_date1}' " + ) + if parameters.get("operator2"): + from_date2 = datetime.strptime( + parameters.get("value2"), "%Y-%m-%d" + ).isoformat() + filter_text = ( + filter_text + + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " + ) + elif parameters.get("dtype") not in ["datetime", "date"]: + filter_text = ( + filter_text + + f"{column} {parameters.get('operator1')} '{parameters.get('value1')}'" + ) + if parameters.get("operator2"): + filter_text = ( + filter_text + + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" + ) + if parameters.get("filters_conjuction"): + filter_text = filter_text + f"{parameters.get('filters_conjuction')} " + + return filter_text + + def make_filter_for_df( + self, + filters: dict = None, + ) -> "str": + """ + Function changing dict operators into pandas DataFrame filters. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter to DataFrame. + """ + + filter_in_df = "df.loc[" + + for column, parameters in filters.items(): + filter_in_df = ( + filter_in_df + + f"(df.{column} {parameters.get('operator1', '')} '{parameters.get('value1', '')}'" + ) + + if parameters.get("operator2"): + filter_in_df = ( + filter_in_df + + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + ) + + if parameters.get("filters_conjuction"): + filter_in_df = filter_in_df + ")" + parameters.get("filters_conjuction") + + else: + filter_in_df = filter_in_df + ")" + + filter_in_df = filter_in_df + "]" + + return filter_in_df + + def list_item_to_df( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + ): + """ + Method to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pd.DataFrame + """ + + # checking if the passed filters dictionary is correctly built + if filters is not None: + self.check_filters(filters) + # checking that the filter parameters are included in the desired field parameters + for key in filters: + if key not in required_fields: + raise AttributeError( + f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!" + ) + + # changing the body of the filter for MS API call + filter_text = self.make_filter_for_api(filters) + + download_all = False + + # extracting requeird_fields SP_List objects + selected_fields = self.select_expandable_user_fields( + list_title=list_title, + site_url=site_url, + required_fields=required_fields, + field_property=field_property, + ) + + try: + # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. + if filters is None: + raise ValueError("There is no filter. Starting extraxction all data") + else: + list_items = ( + self.list_object.items.filter(filter_text) + .select(selected_fields["FieldInternalNames"]) + .top(row_count) + .expand(selected_fields["FieldToExpand"]) + ) + self.ctx.load(list_items) + self.ctx.execute_query() + + except (ClientRequestException, ValueError) as e: + # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFreame level. + logger.info(f"Exception SPQueryThrottledException occurred: {e}") + list_items = ( + self.list_object.items.get_all(row_count, log_of_progress) + .select(selected_fields["FieldInternalNames"]) + .expand(selected_fields["FieldToExpand"]) + ) + self.ctx.load(list_items) + self.ctx.execute_query() + download_all = True + + df = pd.DataFrame( + [self._unpack_fields(row_item, selected_fields) for row_item in list_items] + ) + + if download_all == True and filters is not None: + # Filter for desired range of created date and for factory Namyslow PL + self.logger.info("Filtering df with all data output") + filter_for_df = self.make_filter_for_df(filters) + df = eval(filter_for_df) + + return df diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 28442a739..6173e2994 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -2,6 +2,7 @@ import json import os import shutil +import re from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast @@ -23,7 +24,7 @@ from visions.typesets.complete_set import CompleteSet from viadot.config import local_config -from viadot.exceptions import CredentialError +from viadot.exceptions import CredentialError, ValidationError from viadot.tasks import AzureDataLakeUpload, AzureKeyVaultSecret logger = logging.get_logger() @@ -668,3 +669,126 @@ def anonymize_df( df.drop(columns=["temp_date_col"], inplace=True, errors="ignore") return df + + +@task(timeout=3600) +def validate_df(df: pd.DataFrame, tests: dict = None) -> None: + """ + Task to validate the data on DataFrame level. All numbers in the ranges are inclusive. + tests: + - `column_size`: dict{column: size} + - `column_unique_values`: list[columns] + - `column_list_to_match`: list[columns] + - `dataset_row_count`: dict: {'min': number, 'max', number} + - `column_match_regex`: dict: {column: 'regex'} + - `column_sum`: dict: {column: {'min': number, 'max': number}} + + Args: + df (pd.DataFrame): The data frame for validation. + tests (dict, optional): Tests to apply on the data frame. Defaults to None. + + Raises: + ValidationError: If validation failed for at least one test. + """ + failed_tests = 0 + failed_tests_list = [] + + if tests is not None: + if "column_size" in tests: + try: + for k, v in tests["column_size"].items(): + column_max_length = ( + df.astype(str).apply(lambda s: s.str.len()).max().to_dict() + ) + try: + if v == column_max_length[k]: + logger.info(f"[column_size] for {k} passed.") + else: + logger.error( + f"[column_size] test for {k} failed. field lenght is different than {v}" + ) + failed_tests += 1 + failed_tests_list.append("column_size error") + except Exception as e: + logger.error(f"{e}") + except TypeError as e: + logger.error( + "Please provide `column_size` parameter as dictionary {'columns': value}." + ) + + if "column_unique_values" in tests: + for column in tests["column_unique_values"]: + df_size = df.shape[0] + if df[column].nunique() == df_size: + logger.info( + f"[column_unique_values] Values are unique for {column} column." + ) + else: + failed_tests += 1 + failed_tests_list.append("column_unique_values error") + logger.error( + f"[column_unique_values] Values for {column} are not unique." + ) + + if "column_list_to_match" in tests: + if set(tests["column_list_to_match"]) == set(df.columns): + logger.info(f"[column_list_to_match] passed.") + else: + failed_tests += 1 + failed_tests_list.append("column_list_to_match error") + logger.error( + "[column_list_to_match] failed. Columns are different than expected." + ) + + if "dataset_row_count" in tests: + row_count = len(df.iloc[:, 0]) + max_value = tests["dataset_row_count"]["max"] or 100_000_000 + min_value = tests["dataset_row_count"]["min"] or 0 + + if (row_count > min_value) and (row_count < max_value): + logger.info("[dataset_row_count] passed.") + else: + failed_tests += 1 + failed_tests_list.append("dataset_row_count error") + logger.error( + f"[dataset_row_count] Row count ({row_count}) is not between {min_value} and {max_value}." + ) + + if "column_match_regex" in tests: + for k, v in tests["column_match_regex"].items(): + try: + matches = df[k].apply(lambda x: bool(re.match(v, str(x)))) + if all(matches): + logger.info(f"[column_match_regex] on {k} column passed.") + else: + failed_tests += 1 + failed_tests_list.append("column_match_regex error") + logger.error(f"[column_match_regex] on {k} column failed!") + except Exception as e: + failed_tests += 1 + failed_tests_list.append("column_match_regex error") + logger.error(f"[column_match_regex] Error in {k} column: {e}") + + if "column_sum" in tests: + for column, bounds in tests["column_sum"].items(): + col_sum = df[column].sum() + min_bound = bounds["min"] + max_bound = bounds["max"] + if min_bound <= col_sum <= max_bound: + logger.info( + f"[column_sum] Sum of {col_sum} for {column} is within the expected range." + ) + else: + failed_tests += 1 + failed_tests_list.append("column_sum error") + logger.error( + f"[column_sum] Sum of {col_sum} for {column} is out of the expected range - <{min_bound}:{max_bound}>" + ) + else: + return "No dataframe tests to run." + + if failed_tests > 0: + failed_tests_msg = ", ".join(failed_tests_list) + raise ValidationError( + f"Validation failed for {failed_tests} test/tests: {failed_tests_msg}" + ) diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 02791852b..ecba1d5c5 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .outlook import OutlookToDF from .prefect_date_range import GetFlowNewDateRange from .salesforce import SalesforceBulkUpsert, SalesforceToDF, SalesforceUpsert -from .sharepoint import SharepointToDF +from .sharepoint import SharepointToDF, SharepointListToDF from .sqlite import SQLiteInsert, SQLiteQuery, SQLiteSQLtoDF from .supermetrics import SupermetricsToCSV, SupermetricsToDF @@ -57,4 +57,4 @@ from .sql_server import SQLServerCreateTable, SQLServerQuery, SQLServerToDF from .vid_club import VidClubToDF from .git import CloneRepo -from .luma import LumaIngest \ No newline at end of file +from .luma import LumaIngest diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index 403ed7081..de47ddebf 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -10,6 +10,7 @@ from prefect.engine import signals from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs +from viadot.task_utils import * from viadot.exceptions import APIError from viadot.sources import Genesys @@ -33,6 +34,7 @@ def __init__( conversationId_list: List[str] = None, key_list: List[str] = None, credentials_genesys: Dict[str, Any] = None, + validate_df_dict: Dict[str, Any] = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any], @@ -54,6 +56,8 @@ def __init__( sep (str, optional): Separator in csv file. Defaults to "\t". conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -72,6 +76,7 @@ def __init__( self.sep = sep self.conversationId_list = conversationId_list self.key_list = key_list + self.validate_df_dict = validate_df_dict super().__init__( name=self.report_name, @@ -293,6 +298,7 @@ def merge_conversations_dfs(self, data_to_merge: list) -> DataFrame: "credentials_genesys", "conversationId_list", "key_list", + "validate_df_dict", ) def run( self, @@ -309,6 +315,7 @@ def run( conversationId_list: List[str] = None, key_list: List[str] = None, credentials_genesys: Dict[str, Any] = None, + validate_df_dict: Dict[str, Any] = None, ) -> List[str]: """ Task for downloading data from the Genesys API to DF. @@ -327,6 +334,8 @@ def run( report_columns (List[str], optional): List of exisiting column in report. Defaults to None. conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. Returns: List[str]: List of file names. @@ -450,7 +459,8 @@ def run( date = start_date.replace("-", "") file_name = f"conversations_detail_{date}".upper() + ".csv" - + if validate_df_dict: + validate_df.run(df=final_df, tests=validate_df_dict) final_df.to_csv( os.path.join(self.local_file_path, file_name), index=False, @@ -477,6 +487,8 @@ def run( key: value for (key, value) in attributes.items() if key in key_list } temp_dict["conversationId"] = json_file["id"] + temp_dict["startTime"] = json_file["startTime"] + temp_dict["endTime"] = json_file["endTime"] data_list.append(temp_dict) df = pd.DataFrame(data_list) @@ -486,6 +498,8 @@ def run( end = end_date.replace("-", "") file_name = f"WEBMESSAGE_{start}-{end}.csv" + if validate_df_dict: + validate_df.run(df=df, tests=validate_df_dict) df.to_csv( os.path.join(file_name), index=False, diff --git a/viadot/tasks/sap_bw.py b/viadot/tasks/sap_bw.py index 4d34f9960..acc92c246 100644 --- a/viadot/tasks/sap_bw.py +++ b/viadot/tasks/sap_bw.py @@ -1,6 +1,7 @@ import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret +from viadot.tasks import AzureKeyVaultSecret from prefect.utilities import logging from viadot.exceptions import ValidationError @@ -27,13 +28,14 @@ def __init__( sapbw_credentials_key (str, optional): Azure KV secret. Defaults to "SAP". env (str, optional): SAP environment. Defaults to "BW". """ - if sapbw_credentials is None: - self.sapbw_credentials = credentials_loader.run( - credentials_secret=sapbw_credentials_key - ).get(env) + if not sapbw_credentials: + credentials_str = AzureKeyVaultSecret( + sapbw_credentials_key, + ).run() + self.sapbw_credentials = json.loads(credentials_str).get(env) else: - self.sapbw_credentials = sapbw_credentials + self.sapbw_credentials = PrefectSecret("SAP_BW").run() super().__init__( name="sapbw_to_df", diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index 7ba9c4d41..2a1cb0bc4 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -1,16 +1,17 @@ +from typing import List +import pandas as pd import copy import json import os -from typing import List +import re -import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs from ..exceptions import ValidationError -from ..sources import Sharepoint +from ..sources import Sharepoint, SharepointList from .azure_key_vault import AzureKeyVaultSecret from ..utils import add_viadot_metadata_columns @@ -230,3 +231,165 @@ def run( df = self.df_replace_special_chars(df) self.logger.info(f"Successfully converted data to a DataFrame.") return df + + +class SharepointListToDF(Task): + """ + Task to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pandas DataFrame + """ + + def __init__( + self, + path: str = None, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.row_count = row_count + self.vault_name = vault_name + self.credentials_secret = credentials_secret + + if not credentials_secret: + # Attempt to read a default for the service principal secret name + try: + credentials_secret = PrefectSecret("SHAREPOINT-CERT").run() + except ValueError: + pass + + if credentials_secret: + credentials_str = AzureKeyVaultSecret( + secret=self.credentials_secret, vault_name=self.vault_name + ).run() + self.credentials = json.loads(credentials_str) + + super().__init__( + *args, + **kwargs, + ) + + def __call__(self): + """Download Sharepoint_List data to a .parquet file""" + super().__call__(self) + + def _convert_camel_case_to_words(self, input_str: str) -> str: + + self.input_str = input_str + + words = re.findall(r"[A-Z][a-z]*|[0-9]+", self.input_str) + converted = " ".join(words) + + return converted + + def change_column_name( + self, + df: pd.DataFrame = None, + ): + s = SharepointList() + list_fields = s.get_fields( + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + ) + + self.logger.info("Changing columns names") + column_names_correct = [field.properties["Title"] for field in list_fields] + column_names_code = [field.properties["InternalName"] for field in list_fields] + dictionary = dict(zip(column_names_code, column_names_correct)) + + # If duplicates in names from "Title" take "InternalName" + value_count = {} + duplicates = [] + + for key, value in dictionary.items(): + if value in value_count: + if value_count[value] not in duplicates: + duplicates.append(value_count[value]) + duplicates.append(key) + else: + value_count[value] = key + + for key in duplicates: + dictionary[key] = self._convert_camel_case_to_words(key) + + # Rename columns names inside DataFrame + df = df.rename(columns=dictionary) + + return df + + def run( + self, + ) -> None: + """ + Run Task SharepointListToDF. + + Returns: + pd.DataFrame + """ + + s = SharepointList( + credentials=self.credentials, + ) + df_raw = s.list_item_to_df( + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + field_property=self.field_property, + filters=self.filters, + row_count=self.row_count, + ) + + df = self.change_column_name(df=df_raw) + self.logger.info("Successfully changed structure of the DataFrame") + + return df diff --git a/viadot/utils.py b/viadot/utils.py index 2b4c80538..d05cfdd95 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -449,3 +449,14 @@ def wrapper(*args, **kwargs) -> pd.DataFrame: return wrapper return decorator + + +def get_nested_dict(d): + if isinstance(d, dict): + for lvl in d.values(): + if isinstance(lvl, dict): + return get_nested_dict(lvl) + else: + return d + else: + return None