From d6590aed9a7fe955f9153bf7f8819f74bddbc4b6 Mon Sep 17 00:00:00 2001 From: marcinpurtak Date: Thu, 26 Oct 2023 13:08:14 +0200 Subject: [PATCH 1/3] Sharepoint list connector --- CHANGELOG.md | 9 +- tests/integration/test_sharepoint.py | 152 +++++++++- viadot/flows/__init__.py | 4 +- viadot/flows/sharepoint_to_adls.py | 207 ++++++++++++- viadot/sources/__init__.py | 4 +- viadot/sources/sharepoint.py | 424 ++++++++++++++++++++++++++- viadot/tasks/__init__.py | 2 +- viadot/tasks/sharepoint.py | 175 ++++++++++- viadot/utils.py | 11 + 9 files changed, 970 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a0be2f82..2ede484c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Added `validate_df` task to task_utils. +- Added `SharepointList` source class. +- Added `SharepointListToDF` task class. +- Added `SharepointListToADLS` flow class. +- Added tests for `SharepointList`. +- Added `get_nested_dict` to untils.py. +- Added `validate_df` task to `SharepointToADLS` class. + ### Fixed ### Changed @@ -618,4 +625,4 @@ specified in the `SUPERMETRICS_DEFAULT_USER` secret - Moved from poetry to pip ### Fixed -- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part +- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part \ No newline at end of file diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index c784fa682..b56d10182 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -1,6 +1,8 @@ import os +import re import pandas as pd +from copy import deepcopy import pytest from prefect.tasks.secrets import PrefectSecret @@ -9,6 +11,7 @@ from viadot.sources import Sharepoint from viadot.task_utils import df_get_data_types_task from viadot.tasks.sharepoint import SharepointToDF +from viadot.sources import SharepointList def get_url() -> str: @@ -18,7 +21,7 @@ def get_url() -> str: Returns: str: File URL. """ - return local_config["SHAREPOINT"].get("url") + return local_config["SHAREPOINT"].get("file_url") @pytest.fixture(scope="session") @@ -163,3 +166,150 @@ def test_get_data_types(file_name): dtypes = dtypes_map.values() assert "String" in dtypes + + +# testing get_connection function passing invalid credentials and raises AuthenticationContext error. +def test_get_connection(): + site_url = "https://velux.sharepoint.com/" + credentials ={ + "SHAREPOINT_CERT": + {"TENANT": "xxx", + "CLIENT_ID": "123", + "SCOPES": "https://velux.sharepoint.com/", + "THUMBPRINT": "xyz", + "PRIVATE_KEY": "private"} + } + + spl = SharepointList(credentials=credentials) + with pytest.raises(AttributeError, match="'SharepointList' object has no attribute 'ctx'"): + spl.get_connection(site_url=site_url) + + +@pytest.fixture(scope="session") +def sharepoint_list(): + """ + Fixture for creating a Sharepoint class instance. + The class instance can be used within a test functions to interact with Sharepoint. + """ + spl = SharepointList() + yield spl + + +def test_valid_filters(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '<', 'value1': 10}, + 'filter2': {'dtype': 'str', 'operator1': '==', 'value1': 'value'}, + } + result = sharepoint_list.check_filters(filters) + assert result is True + +def test_invalid_dtype(sharepoint_list): + filters = { + 'filter1': {'dtype': 'list', 'operator1': '>', 'value1': 10}, + } + with pytest.raises(ValueError, match="dtype not allowed!"): + sharepoint_list.check_filters(filters) + +def test_missing_operator1(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'value1': 10}, + } + with pytest.raises(ValueError, match="Operator1 is missing!"): + sharepoint_list.check_filters(filters) + +def test_invalid_operator1(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '*', 'value1': 10}, + } + with pytest.raises(ValueError, match="Operator type not allowed!"): + sharepoint_list.check_filters(filters) + +def test_missing_value1(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': None}, + } + with pytest.raises(ValueError, match="Value for operator1 is missing!"): + sharepoint_list.check_filters(filters) + +def test_missing_operators_conjuction(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'operator2': '<', 'value2': 20}, + } + with pytest.raises(ValueError, match="Operators for conjuction is missing!"): + sharepoint_list.check_filters(filters) + +def test_invalid_operators_conjuction(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'operator2': '<', 'value2': 20, 'operators_conjuction': '!'}, + } + with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): + sharepoint_list.check_filters(filters) + +def test_invalid_filters_conjuction(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'filters_conjuction': '!'}, + } + with pytest.raises(ValueError, match="Filters operators for conjuction not allowed!"): + sharepoint_list.check_filters(filters) + +def test_valid_mapping(sharepoint_list): + filters = { + 'filter1': {'operator1': '>', 'operator2': '<=', 'operators_conjuction': '&', 'filters_conjuction': '|'}, + 'filter2': {'operator1': '==', 'operator2': '!=', 'operators_conjuction': '|'}, + } + expected_result = { + 'filter1': {'operator1': 'gt', 'operator2': 'le', 'operators_conjuction': 'and', 'filters_conjuction': 'or'}, + 'filter2': {'operator1': 'eq', 'operator2': 'ne', 'operators_conjuction': 'or'}, + } + result = sharepoint_list.operators_mapping(deepcopy(filters)) + assert result == expected_result + +def test_invalid_comparison_operator(sharepoint_list): + filters = { + 'filter1': {'operator1': '*', 'operator2': '<=', 'operators_conjuction': '&', 'filters_conjuction': '|'}, + } + error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + +def test_invalid_logical_operator(sharepoint_list): + filters = { + 'filter1': {'operator1': '>', 'operator2': '<=', 'operators_conjuction': '!', 'filters_conjuction': '|'}, + } + error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + +def test_single_filter_datetime_api(sharepoint_list): + filters = { + 'date_column': {'dtype': 'datetime', 'operator1': '>', 'value1': '2023-01-01'} + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "date_column gt datetime'2023-01-01T00:00:00' " + assert result == expected_result + +def test_multiple_filters_api(sharepoint_list): + filters = { + 'int_column': {'dtype': 'int', 'operator1': '>=', 'value1': 10, 'operator2': '<', 'value2': 20}, + 'str_column': {'dtype': 'str', 'operator1': '==', 'value1': 'example'} + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "int_column ge '10'int_column lt '20'str_column eq 'example'" + assert result == expected_result + +def test_single_df_filter(sharepoint_list): + filters = { + 'column1': {'operator1': '>', 'value1': 10} + } + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')]" + assert result == expected_result + +def test_multiple_df_filters(sharepoint_list): + filters = { + 'column1': {'operator1': '>', 'value1': 10, 'filters_conjuction': '&'}, + 'column2': {'operator1': '<', 'value1': 20} + } + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')&(df.column2 < '20')]" + assert result == expected_result \ No newline at end of file diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index de2f618ab..ed6b5a004 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -11,7 +11,7 @@ from .genesys_to_adls import GenesysToADLS from .outlook_to_adls import OutlookToADLS from .salesforce_to_adls import SalesforceToADLS -from .sharepoint_to_adls import SharepointToADLS +from .sharepoint_to_adls import SharepointToADLS, SharepointListToADLS from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL @@ -46,4 +46,4 @@ from .sql_server_to_parquet import SQLServerToParquet from .sql_server_transform import SQLServerTransform from .transform_and_catalog import TransformAndCatalogToLuma -from .vid_club_to_adls import VidClubToADLS +from .vid_club_to_adls import VidClubToADLS \ No newline at end of file diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index df5562221..c234b4a88 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -3,12 +3,10 @@ from typing import Any, Dict, List import pendulum -from prefect import Flow, task +from prefect import Flow from prefect.backend import set_key_value from prefect.utilities import logging -logger = logging.get_logger() - from viadot.task_utils import ( add_ingestion_metadata_task, df_get_data_types_task, @@ -16,9 +14,13 @@ df_to_csv, df_to_parquet, dtypes_to_json_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload -from viadot.tasks.sharepoint import SharepointToDF +from viadot.tasks.sharepoint import SharepointToDF, SharepointListToDF + + +logger = logging.get_logger() class SharepointToADLS(Flow): @@ -38,6 +40,7 @@ def __init__( overwrite_adls: bool = False, if_empty: str = "warn", if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -62,6 +65,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_empty (str, optional): What to do if query returns no data. Defaults to "warn". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -74,6 +79,7 @@ def __init__( self.sheet_number = sheet_number self.validate_excel_file = validate_excel_file self.timeout = timeout + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.overwrite = overwrite_adls @@ -117,6 +123,10 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) df_mapped = df_map_mixed_dtypes_for_parquet.bind( @@ -169,3 +179,192 @@ def gen_flow(self) -> Flow: @staticmethod def slugify(name): return name.replace(" ", "_").lower() + + + +class SharepointListToADLS(Flow): + def __init__( + self, + name: str, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + sp_cert_credentials_secret: str = None, + vault_name: str = None, + path: str = None, + adls_dir_path: str = None, + adls_file_name: str = None, + adls_sp_credentials_secret: str = None, + overwrite_adls: bool = True, + output_file_extension: str = ".parquet", + validate_df_dict: dict = None, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Run Flow SharepointListToADLS. + + Args: + name (str): Prefect flow name. + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', # conjuction operators allowed only when 2 values passed + 'filters_conjuction':'&', # conjuction filters allowed only when 2 columns passed + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + sp_cert_credentials_secret (str): Credentials to verify Sharepoint connection. Default to None. + vault_name (str): KeyVaultSecret name. Default to None. + path (str): Local file path. Default to None. + adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_file_name (str, optional): Name of file in ADLS. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, + CLIENT_SECRET) for the Azure Data Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + + Returns: + .parquet file inside ADLS. + """ + + # SharepointListToDF + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.sp_cert_credentials_secret = sp_cert_credentials_secret + self.vault_name = vault_name + self.row_count = row_count + self.validate_df_dict = validate_df_dict + + + # AzureDataLakeUpload + self.adls_dir_path = adls_dir_path + self.adls_file_name = adls_file_name + self.overwrite = overwrite_adls + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.output_file_extension = output_file_extension + self.now = str(pendulum.now("utc")) + if self.path is not None: + self.local_file_path = ( + self.path + self.slugify(name) + self.output_file_extension + ) + else: + self.local_file_path = self.slugify(name) + self.output_file_extension + self.local_json_path = self.slugify(name) + ".json" + self.adls_dir_path = adls_dir_path + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + + + super().__init__( + name=name, + *args, + **kwargs, + ) + + self.gen_flow() + + def gen_flow(self) -> Flow: + s = SharepointListToDF( + path = self.path, + list_title = self.list_title, + site_url = self.site_url, + required_fields = self.required_fields, + field_property = self.field_property, + filters = self.filters, + row_count = self.row_count, + credentials_secret=self.sp_cert_credentials_secret, + ) + df = s.run() + + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation.set_upstream(df, flow=self) + + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) + + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.path, + flow=self, + ) + + file_to_adls_task = AzureDataLakeUpload() + file_to_adls_task.bind( + from_path=self.path, + to_path=self.adls_dir_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_to_json_task.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) + + + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json_task.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json_task, flow=self) + + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() \ No newline at end of file diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 094cec14e..8308c78c9 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -8,7 +8,7 @@ from .outlook import Outlook from .salesforce import Salesforce from .sftp import SftpConnector -from .sharepoint import Sharepoint +from .sharepoint import Sharepoint, SharepointList from .supermetrics import Supermetrics try: @@ -33,4 +33,4 @@ # APIS from .uk_carbon_intensity import UKCarbonIntensity -from .vid_club import VidClub +from .vid_club import VidClub \ No newline at end of file diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 61eda17f2..028499c94 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -1,10 +1,26 @@ -from typing import Any, Dict - -import sharepy - from ..config import local_config from ..exceptions import CredentialError from .base import Source +from viadot.utils import get_nested_dict + +from typing import Any, Dict, List +from fnmatch import fnmatch +from datetime import datetime +from copy import deepcopy +import pandas as pd + +import sharepy +from office365.runtime.auth.authentication_context import AuthenticationContext +from office365.sharepoint.client_context import ClientContext +from office365.runtime.client_request_exception import ClientRequestException +from prefect.utilities import logging + + +logger = logging.get_logger() + +# Print out how many rows was extracted in specific iteration +def log_of_progress(items): + logger.info("Items read: {0}".format(len(items))) class Sharepoint(Source): @@ -64,3 +80,403 @@ def download_file( url=download_from_path, filename=download_to_path, ) + + + +class SharepointList(Source): + """ + A Sharepoint_List class to connect and download data from sharpoint lists. + + Args: + credentials (dict): Credentials should include: + - "tenant" + - "client_id" + - "scopes" + - "thumbprint" + - "private_key" + """ + + def __init__( + self, + credentials: Dict[str, Any] = None, + *args, + **kwargs, + ): + DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT_CERT") + credentials = credentials or DEFAULT_CREDENTIALS + if credentials is None: + raise CredentialError("Credentials not found.") + + super().__init__(*args, credentials=credentials, **kwargs) + + + def get_connection( + self, + site_url: str = None, + ): + + # Connecting into Sharepoint with AuthenticationContext + try: + auth_context = AuthenticationContext(site_url) + auth_context.with_client_certificate(tenant=self.credentials["TENANT"], + client_id=self.credentials["CLIENT_ID"], + scopes=[self.credentials["SCOPES"]], + thumbprint=self.credentials["THUMBPRINT"], + private_key=self.credentials["PRIVATE_KEY"]) + + self.ctx = ClientContext(site_url, auth_context) + logger.info("Successfully connect to Sharepoint Lists") + + except Exception as ex: + logger.info(f"Error at ClientContext or authentication level: {ex}") + + return self.ctx + + + # Function for extracting list items from search fields + def _unpack_fields( + self, + list_item, + selected_fields: dict = None, + ): + + # Creating the body of dictionary + new_dict = dict() + + # For loop scanning the propertys of searching fields + item_values_dict = list_item.properties + for field,val in item_values_dict.items(): + nested_dict = get_nested_dict(val) + # Check if the dictionary is nested + if nested_dict != None: + # It might be that there are different field properties than expected + nested_value = nested_dict.get(selected_fields["FieldProperty"]) + if nested_value != None: + new_dict[field] = nested_value + else: + logger.info("I'm not the right value") + raise ValueError + else: + new_dict[field] = val + + return new_dict + + + def get_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + ): + + ctx = self.get_connection(site_url = site_url) + + # Get list of lists object by List Title + self.list_object = ctx.web.lists.get_by_title(list_title) + list_fields_all = self.list_object.fields + + # Get all or specifics list of objects + if required_fields is None: + ctx.load(list_fields_all) + ctx.execute_query() + + return list_fields_all + + else: + list_fields_required = [list_fields_all.get_by_internal_name_or_title(field).get() + for field in required_fields] + ctx.execute_batch() + + return list_fields_required + + + def select_expandable_user_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + ): + """ + Method to expand fields and get more informations. + field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. + -> more properties can be discovered by getting list.item.properties. + Default to "Title" + """ + + list_fields = self.get_fields(list_title=list_title, + site_url=site_url, + required_fields=required_fields) + + # Finding the "selected" fields + fields_to_select = [field.properties['InternalName'] +f"/{field_property}" + if fnmatch(field.properties['TypeAsString'],"User*") + else field.properties['InternalName'] for field in list_fields] + # Finding the "expanded" fields + fields_to_expand = [field.properties['InternalName'] for field in list_fields + if fnmatch(field.properties['TypeAsString'],f"User*")] + + # Creating the body of the function output + selected_fields = {"FieldInternalNames" : fields_to_select, "FieldToExpand": fields_to_expand, "FieldProperty": field_property} + + return selected_fields + + + def check_filters( + self, + filters: dict = None, + ) -> bool: + """ + Function to check if filters dict is valid. + example1: if operator2 is present value2 must be in place as well + example2: if dtype is not on allowed list it will throw an error + """ + + allowed_dtypes = ['datetime','date','bool','int', 'float', 'complex', 'str'] + allowed_conjuction = ['&','|'] + allowed_operators = ['<','>','<=','>=','==','!='] + + for parameters in filters.values(): + if parameters.get('dtype') not in allowed_dtypes: + raise ValueError(f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}.") + if parameters.get('operator1'): + if parameters.get('operator1') not in allowed_operators: + raise ValueError(f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}.") + if not parameters.get('value1'): + raise ValueError('Value for operator1 is missing!') + elif not parameters.get('operator1') : raise ValueError('Operator1 is missing!') + if not parameters.get('operator2') and parameters.get('operators_conjuction') is not None: + raise ValueError(f"Operator conjuction allowed only with more than one filter operator!") + if parameters.get('operator2'): + if parameters.get('operator2') not in allowed_operators: + raise ValueError(f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}.") + if not parameters.get('value2'): + raise ValueError('Value for operator2 is missing!') + if not parameters.get('operators_conjuction'): + raise ValueError(f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty.") + if parameters.get('operators_conjuction') not in allowed_conjuction: + raise ValueError(f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}.") + if parameters.get('filters_conjuction'): + if len(filters.keys()) == 1 and parameters.get('filters_conjuction') is not None: + raise ValueError(f"Filters conjuction allowed only with more than one filter column!") + if parameters.get('filters_conjuction') not in allowed_conjuction: + raise ValueError(f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}.") + + return True + + + def operators_mapping( + self, + filters: dict = None, + ) -> dict: + """ + Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + New modified dict. + """ + + filters_dict = deepcopy(filters) + operators = { + '<' : 'lt', + '>' : 'gt', + '<=' : 'le', + '>=' : 'ge', + '==' : 'eq', + '!=' : 'ne' + } + logical_op = { + '&' : 'and', + '|' : 'or' + } + + for parameters in filters_dict.values(): + if parameters.get('operator1'): + operator1_to_change = parameters.get('operator1') + if operator1_to_change in operators.keys(): + parameters['operator1'] = operators[operator1_to_change] + else: raise ValueError(f'This comparison operator: {operator1_to_change} is not allowed. Please read the function documentation for details!') + if parameters.get('operator2'): + operator2_to_change = parameters.get('operator2') + if operator2_to_change in operators.keys(): + parameters['operator2'] = operators[operator2_to_change] + else: raise ValueError(f'This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!') + if parameters.get('operators_conjuction'): + logical_op_to_change = parameters.get('operators_conjuction') + if logical_op_to_change in logical_op.keys(): + parameters['operators_conjuction'] = logical_op[logical_op_to_change] + else: raise ValueError(f'This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!') + if parameters.get('filters_conjuction'): + logical_fl_to_change = parameters.get('filters_conjuction') + if logical_fl_to_change in logical_op.keys(): + parameters['filters_conjuction'] = logical_op[logical_fl_to_change] + else: raise ValueError(f'This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!') + + return filters_dict + + + def make_filter_for_api( + self, + filters: dict + ) -> 'str': + """ + Function changing type of operators to match MS API style as 'str' passing to URL call. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter parameter to API. + """ + + filter_text = '' + filters_mod = self.operators_mapping(filters) + + for column, parameters in filters_mod.items(): + if parameters.get('dtype') in ['datetime','date']: + from_date1 = datetime.strptime(parameters.get('value1'),'%Y-%m-%d').isoformat() + filter_text = filter_text + f"{column} {parameters.get('operator1')} datetime'{from_date1}' " + if parameters.get('operator2'): + from_date2 = datetime.strptime(parameters.get('value2'),'%Y-%m-%d').isoformat() + filter_text = filter_text + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " + elif parameters.get('dtype') not in ['datetime','date']: + filter_text = filter_text + f"{column} {parameters.get('operator1')} '{parameters.get('value1')}'" + if parameters.get('operator2'): + filter_text = filter_text + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" + if parameters.get('filters_conjuction'): + filter_text = filter_text + f"{parameters.get('filters_conjuction')} " + + return filter_text + + + def make_filter_for_df( + self, + filters: dict = None, + ) -> 'str': + """ + Function changing dict operators into pandas DataFrame filters. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter to DataFrame. + """ + + filter_in_df = 'df.loc[' + + for column, parameters in filters.items(): + filter_in_df = filter_in_df + f"(df.{column} {parameters.get('operator1', '')} '{parameters.get('value1', '')}'" + + if parameters.get('operator2'): + filter_in_df = filter_in_df + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + + if parameters.get('filters_conjuction'): + filter_in_df = filter_in_df + ')' + parameters.get('filters_conjuction') + + else: + filter_in_df = filter_in_df + ')' + + filter_in_df = filter_in_df + ']' + + return filter_in_df + + + def list_item_to_df( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + ): + """ + Method to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pd.DataFrame + """ + + # checking if the passed filters dictionary is correctly built + if filters is not None: + self.check_filters(filters) + # checking that the filter parameters are included in the desired field parameters + for key in filters: + if key not in required_fields: + raise AttributeError(f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!") + + # changing the body of the filter for MS API call + filter_text = self.make_filter_for_api(filters) + + download_all = False + + # extracting requeird_fields SP_List objects + selected_fields = self.select_expandable_user_fields(list_title=list_title, + site_url=site_url, + required_fields=required_fields, + field_property=field_property) + + try: + # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. + if filters is None: + raise ValueError("There is no filter. Starting extraxction all data") + else: + list_items= self.list_object.items.filter(filter_text).select(selected_fields["FieldInternalNames"]).top(row_count).expand(selected_fields["FieldToExpand"]) + self.ctx.load(list_items) + self.ctx.execute_query() + + except (ClientRequestException, ValueError) as e: + # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFreame level. + logger.info(f"Exception SPQueryThrottledException occurred: {e}") + list_items = self.list_object.items.get_all(row_count,log_of_progress).select(selected_fields["FieldInternalNames"]).expand(selected_fields["FieldToExpand"]) + self.ctx.load(list_items) + self.ctx.execute_query() + download_all = True + + df = pd.DataFrame([self._unpack_fields(row_item,selected_fields) for row_item in list_items]) + + if download_all == True and filters is not None: + # Filter for desired range of created date and for factory Namyslow PL + self.logger.info("Filtering df with all data output") + filter_for_df = self.make_filter_for_df(filters) + df = eval(filter_for_df) + + return df \ No newline at end of file diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 02791852b..1e67d96a6 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .outlook import OutlookToDF from .prefect_date_range import GetFlowNewDateRange from .salesforce import SalesforceBulkUpsert, SalesforceToDF, SalesforceUpsert -from .sharepoint import SharepointToDF +from .sharepoint import SharepointToDF, SharepointListToDF from .sqlite import SQLiteInsert, SQLiteQuery, SQLiteSQLtoDF from .supermetrics import SupermetricsToCSV, SupermetricsToDF diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index 7ba9c4d41..cb7f30a19 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -1,16 +1,17 @@ +from typing import List +import pandas as pd import copy import json import os -from typing import List +import re -import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs from ..exceptions import ValidationError -from ..sources import Sharepoint +from ..sources import Sharepoint, SharepointList from .azure_key_vault import AzureKeyVaultSecret from ..utils import add_viadot_metadata_columns @@ -230,3 +231,171 @@ def run( df = self.df_replace_special_chars(df) self.logger.info(f"Successfully converted data to a DataFrame.") return df + + + +class SharepointListToDF(Task): + """ + Task to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pandas DataFrame + """ + + def __init__( + self, + path: str = None, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.row_count = row_count + self.vault_name = vault_name + self.credentials_secret = credentials_secret + + if not credentials_secret: + # Attempt to read a default for the service principal secret name + try: + credentials_secret = PrefectSecret("SHAREPOINT-CERT").run() + except ValueError: + pass + + if credentials_secret: + credentials_str = AzureKeyVaultSecret( + secret = self.credentials_secret, vault_name=self.vault_name + ).run() + self.credentials = json.loads(credentials_str) + + + super().__init__( + *args, + **kwargs, + ) + + + def __call__(self): + """Download Sharepoint_List data to a .parquet file""" + super().__call__(self) + + + def _convert_camel_case_to_words( + self, + input_str: str) -> str: + + self.input_str = input_str + + words = re.findall(r'[A-Z][a-z]*|[0-9]+', self.input_str) + converted = ' '.join(words) + + return converted + + + def change_column_name( + self, + df: pd.DataFrame = None, + ): + s = SharepointList() + list_fields = s.get_fields( + list_title= self.list_title, + site_url= self.site_url, + required_fields= self.required_fields, + ) + + self.logger.info("Changing columns names") + column_names_correct = [field.properties['Title'] for field in list_fields] + column_names_code = [field.properties['InternalName'] for field in list_fields] + dictionary = dict(zip(column_names_code, column_names_correct)) + + # If duplicates in names from "Title" take "InternalName" + value_count = {} + duplicates = [] + + for key, value in dictionary.items(): + if value in value_count: + if value_count[value] not in duplicates: + duplicates.append(value_count[value]) + duplicates.append(key) + else: + value_count[value] = key + + for key in duplicates: + dictionary[key] = self._convert_camel_case_to_words(key) + + # Rename columns names inside DataFrame + df = df.rename(columns=dictionary) + + return df + + + def run( + self, + ) -> None: + """ + Run Task SharepointListToDF. + + Returns: + pd.DataFrame + """ + + s = SharepointList(credentials=self.credentials,) + df_raw = s.list_item_to_df( + list_title = self.list_title, + site_url = self.site_url, + required_fields = self.required_fields, + field_property = self.field_property, + filters = self.filters, + row_count = self.row_count, + ) + + df = self.change_column_name(df=df_raw) + self.logger.info("Successfully changed structure of the DataFrame") + + return df \ No newline at end of file diff --git a/viadot/utils.py b/viadot/utils.py index 2b4c80538..49fa33e09 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -449,3 +449,14 @@ def wrapper(*args, **kwargs) -> pd.DataFrame: return wrapper return decorator + + +def get_nested_dict(d): + if isinstance(d, dict): + for lvl in d.values(): + if isinstance(lvl, dict): + return get_nested_dict(lvl) + else: + return d + else: + return None \ No newline at end of file From dceee62735e4f6327f317be55e8811f3d567cb08 Mon Sep 17 00:00:00 2001 From: Angelika Tarnawa Date: Thu, 26 Oct 2023 13:46:34 +0200 Subject: [PATCH 2/3] =?UTF-8?q?=E2=9C=A8=20Added=20fetch=20all?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 006f22e39..ad2f298bd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -73,6 +73,7 @@ jobs: git config --global user.email 'github-actions[bot]@users.noreply.github.com' git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/$GITHUB_REPOSITORY black . + git fetch --all git checkout $GITHUB_HEAD_REF git commit -am "🎨 Format Python code with Black" git push From d436bd3762a10056b5f7b634dbc8c706d4f45f05 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 26 Oct 2023 11:50:34 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=8E=A8=20Format=20Python=20code=20wit?= =?UTF-8?q?h=20Black?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flows/test_bigquery_to_adls.py | 9 +- ...test_cloud_for_customers_report_to_adls.py | 2 +- .../flows/test_customer_gauge_to_adls.py | 2 +- .../integration/flows/test_hubspot_to_adls.py | 2 +- .../flows/test_mediatool_to_adls.py | 2 +- tests/integration/flows/test_mysql_to_adls.py | 1 + .../flows/test_salesforce_to_adls.py | 2 +- .../integration/flows/test_sap_bw_to_adls.py | 2 +- .../integration/flows/test_sap_rfc_to_adls.py | 2 +- .../flows/test_supermetrics_to_adls.py | 2 +- .../integration/flows/test_vidclub_to_adls.py | 2 +- tests/integration/test_sharepoint.py | 128 +++-- tests/unit/test_task_utils.py | 2 +- tests/unit/test_utils.py | 2 +- viadot/flows/__init__.py | 4 +- viadot/flows/sharepoint_to_adls.py | 47 +- viadot/flows/supermetrics_to_adls.py | 2 +- viadot/flows/transform_and_catalog.py | 4 +- viadot/sources/__init__.py | 2 +- viadot/sources/bigquery.py | 2 +- viadot/sources/mindful.py | 4 +- viadot/sources/sharepoint.py | 454 ++++++++++-------- viadot/task_utils.py | 2 +- viadot/tasks/__init__.py | 6 +- viadot/tasks/genesys.py | 2 +- viadot/tasks/luma.py | 2 + viadot/tasks/sap_bw.py | 2 +- viadot/tasks/sharepoint.py | 84 ++-- viadot/utils.py | 2 +- 29 files changed, 451 insertions(+), 328 deletions(-) diff --git a/tests/integration/flows/test_bigquery_to_adls.py b/tests/integration/flows/test_bigquery_to_adls.py index de793344a..b4503c6e9 100644 --- a/tests/integration/flows/test_bigquery_to_adls.py +++ b/tests/integration/flows/test_bigquery_to_adls.py @@ -1,15 +1,14 @@ import os +from unittest import mock +import pandas as pd import pendulum import pytest -from unittest import mock -import pandas as pd - from prefect.tasks.secrets import PrefectSecret -from viadot.flows import BigQueryToADLS -from viadot.tasks import AzureDataLakeRemove from viadot.exceptions import ValidationError +from viadot.flows import BigQueryToADLS +from viadot.tasks import AzureDataLakeRemove ADLS_DIR_PATH = "raw/tests/" ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet" diff --git a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py index f0661e314..b0c3128c5 100644 --- a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py +++ b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py @@ -1,6 +1,6 @@ from viadot.config import local_config -from viadot.flows import CloudForCustomersReportToADLS from viadot.exceptions import ValidationError +from viadot.flows import CloudForCustomersReportToADLS def test_cloud_for_customers_report_to_adls(): diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index 0e7afd3e2..ac0716d66 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import CustomerGaugeToADLS from viadot.exceptions import ValidationError +from viadot.flows import CustomerGaugeToADLS DATA = { "user_name": ["Jane", "Bob"], diff --git a/tests/integration/flows/test_hubspot_to_adls.py b/tests/integration/flows/test_hubspot_to_adls.py index d960fc079..e0c06c20f 100644 --- a/tests/integration/flows/test_hubspot_to_adls.py +++ b/tests/integration/flows/test_hubspot_to_adls.py @@ -5,8 +5,8 @@ import pandas as pd import pytest -from viadot.flows import HubspotToADLS from viadot.exceptions import ValidationError +from viadot.flows import HubspotToADLS DATA = { "id": {"0": "820306930"}, diff --git a/tests/integration/flows/test_mediatool_to_adls.py b/tests/integration/flows/test_mediatool_to_adls.py index d7b5b2658..65cfadf8f 100644 --- a/tests/integration/flows/test_mediatool_to_adls.py +++ b/tests/integration/flows/test_mediatool_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import MediatoolToADLS from viadot.exceptions import ValidationError +from viadot.flows import MediatoolToADLS DATA = {"country": ["DK", "DE"], "sales": [3, 4]} ADLS_FILE_NAME = "test_mediatool.parquet" diff --git a/tests/integration/flows/test_mysql_to_adls.py b/tests/integration/flows/test_mysql_to_adls.py index 942bab99d..c968d48a3 100644 --- a/tests/integration/flows/test_mysql_to_adls.py +++ b/tests/integration/flows/test_mysql_to_adls.py @@ -1,4 +1,5 @@ from unittest import mock + from viadot.flows.mysql_to_adls import MySqlToADLS query = """SELECT * FROM `example-views`.`sales`""" diff --git a/tests/integration/flows/test_salesforce_to_adls.py b/tests/integration/flows/test_salesforce_to_adls.py index ec68a1227..8c032f308 100644 --- a/tests/integration/flows/test_salesforce_to_adls.py +++ b/tests/integration/flows/test_salesforce_to_adls.py @@ -2,9 +2,9 @@ from prefect.tasks.secrets import PrefectSecret +from viadot.exceptions import ValidationError from viadot.flows import SalesforceToADLS from viadot.tasks import AzureDataLakeRemove -from viadot.exceptions import ValidationError ADLS_FILE_NAME = "test_salesforce.parquet" ADLS_DIR_PATH = "raw/tests/" diff --git a/tests/integration/flows/test_sap_bw_to_adls.py b/tests/integration/flows/test_sap_bw_to_adls.py index 2c01049e8..4259e5c16 100644 --- a/tests/integration/flows/test_sap_bw_to_adls.py +++ b/tests/integration/flows/test_sap_bw_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import SAPBWToADLS from viadot.exceptions import ValidationError +from viadot.flows import SAPBWToADLS DATA = { "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], diff --git a/tests/integration/flows/test_sap_rfc_to_adls.py b/tests/integration/flows/test_sap_rfc_to_adls.py index ed33fa320..5503b4684 100644 --- a/tests/integration/flows/test_sap_rfc_to_adls.py +++ b/tests/integration/flows/test_sap_rfc_to_adls.py @@ -1,8 +1,8 @@ from viadot.config import local_config +from viadot.exceptions import ValidationError from viadot.flows import SAPRFCToADLS from viadot.sources import AzureDataLake from viadot.tasks import AzureDataLakeRemove -from viadot.exceptions import ValidationError try: import pyrfc diff --git a/tests/integration/flows/test_supermetrics_to_adls.py b/tests/integration/flows/test_supermetrics_to_adls.py index 9738ddeb1..15deaa01a 100644 --- a/tests/integration/flows/test_supermetrics_to_adls.py +++ b/tests/integration/flows/test_supermetrics_to_adls.py @@ -4,8 +4,8 @@ import pytest from prefect.storage import Local -from viadot.flows import SupermetricsToADLS from viadot.exceptions import ValidationError +from viadot.flows import SupermetricsToADLS CWD = os.getcwd() adls_dir_path = "raw/tests/supermetrics" diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py index c18eaad10..0f6705579 100644 --- a/tests/integration/flows/test_vidclub_to_adls.py +++ b/tests/integration/flows/test_vidclub_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import VidClubToADLS from viadot.exceptions import ValidationError +from viadot.flows import VidClubToADLS DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} ADLS_FILE_NAME = "test_vid_club.parquet" diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index b56d10182..f9453d7d1 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -1,17 +1,16 @@ import os import re +from copy import deepcopy import pandas as pd -from copy import deepcopy import pytest from prefect.tasks.secrets import PrefectSecret from viadot.config import local_config from viadot.exceptions import CredentialError -from viadot.sources import Sharepoint +from viadot.sources import Sharepoint, SharepointList from viadot.task_utils import df_get_data_types_task from viadot.tasks.sharepoint import SharepointToDF -from viadot.sources import SharepointList def get_url() -> str: @@ -171,17 +170,20 @@ def test_get_data_types(file_name): # testing get_connection function passing invalid credentials and raises AuthenticationContext error. def test_get_connection(): site_url = "https://velux.sharepoint.com/" - credentials ={ - "SHAREPOINT_CERT": - {"TENANT": "xxx", + credentials = { + "SHAREPOINT_CERT": { + "TENANT": "xxx", "CLIENT_ID": "123", "SCOPES": "https://velux.sharepoint.com/", "THUMBPRINT": "xyz", - "PRIVATE_KEY": "private"} - } + "PRIVATE_KEY": "private", + } + } spl = SharepointList(credentials=credentials) - with pytest.raises(AttributeError, match="'SharepointList' object has no attribute 'ctx'"): + with pytest.raises( + AttributeError, match="'SharepointList' object has no attribute 'ctx'" + ): spl.get_connection(site_url=site_url) @@ -197,119 +199,177 @@ def sharepoint_list(): def test_valid_filters(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'operator1': '<', 'value1': 10}, - 'filter2': {'dtype': 'str', 'operator1': '==', 'value1': 'value'}, + "filter1": {"dtype": "int", "operator1": "<", "value1": 10}, + "filter2": {"dtype": "str", "operator1": "==", "value1": "value"}, } result = sharepoint_list.check_filters(filters) assert result is True + def test_invalid_dtype(sharepoint_list): filters = { - 'filter1': {'dtype': 'list', 'operator1': '>', 'value1': 10}, + "filter1": {"dtype": "list", "operator1": ">", "value1": 10}, } with pytest.raises(ValueError, match="dtype not allowed!"): sharepoint_list.check_filters(filters) + def test_missing_operator1(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'value1': 10}, + "filter1": {"dtype": "int", "value1": 10}, } with pytest.raises(ValueError, match="Operator1 is missing!"): sharepoint_list.check_filters(filters) + def test_invalid_operator1(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'operator1': '*', 'value1': 10}, + "filter1": {"dtype": "int", "operator1": "*", "value1": 10}, } with pytest.raises(ValueError, match="Operator type not allowed!"): sharepoint_list.check_filters(filters) + def test_missing_value1(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': None}, + "filter1": {"dtype": "int", "operator1": ">", "value1": None}, } with pytest.raises(ValueError, match="Value for operator1 is missing!"): sharepoint_list.check_filters(filters) + def test_missing_operators_conjuction(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'operator2': '<', 'value2': 20}, + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "operator2": "<", + "value2": 20, + }, } with pytest.raises(ValueError, match="Operators for conjuction is missing!"): sharepoint_list.check_filters(filters) + def test_invalid_operators_conjuction(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'operator2': '<', 'value2': 20, 'operators_conjuction': '!'}, + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "operator2": "<", + "value2": 20, + "operators_conjuction": "!", + }, } with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): sharepoint_list.check_filters(filters) + def test_invalid_filters_conjuction(sharepoint_list): filters = { - 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'filters_conjuction': '!'}, + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "filters_conjuction": "!", + }, } - with pytest.raises(ValueError, match="Filters operators for conjuction not allowed!"): + with pytest.raises( + ValueError, match="Filters operators for conjuction not allowed!" + ): sharepoint_list.check_filters(filters) + def test_valid_mapping(sharepoint_list): filters = { - 'filter1': {'operator1': '>', 'operator2': '<=', 'operators_conjuction': '&', 'filters_conjuction': '|'}, - 'filter2': {'operator1': '==', 'operator2': '!=', 'operators_conjuction': '|'}, + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "|", + }, + "filter2": {"operator1": "==", "operator2": "!=", "operators_conjuction": "|"}, } expected_result = { - 'filter1': {'operator1': 'gt', 'operator2': 'le', 'operators_conjuction': 'and', 'filters_conjuction': 'or'}, - 'filter2': {'operator1': 'eq', 'operator2': 'ne', 'operators_conjuction': 'or'}, + "filter1": { + "operator1": "gt", + "operator2": "le", + "operators_conjuction": "and", + "filters_conjuction": "or", + }, + "filter2": {"operator1": "eq", "operator2": "ne", "operators_conjuction": "or"}, } result = sharepoint_list.operators_mapping(deepcopy(filters)) assert result == expected_result + def test_invalid_comparison_operator(sharepoint_list): filters = { - 'filter1': {'operator1': '*', 'operator2': '<=', 'operators_conjuction': '&', 'filters_conjuction': '|'}, + "filter1": { + "operator1": "*", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "|", + }, } error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" with pytest.raises(ValueError, match=re.escape(error_message)): sharepoint_list.operators_mapping(deepcopy(filters)) + def test_invalid_logical_operator(sharepoint_list): filters = { - 'filter1': {'operator1': '>', 'operator2': '<=', 'operators_conjuction': '!', 'filters_conjuction': '|'}, + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "!", + "filters_conjuction": "|", + }, } error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" with pytest.raises(ValueError, match=re.escape(error_message)): sharepoint_list.operators_mapping(deepcopy(filters)) + def test_single_filter_datetime_api(sharepoint_list): filters = { - 'date_column': {'dtype': 'datetime', 'operator1': '>', 'value1': '2023-01-01'} + "date_column": {"dtype": "datetime", "operator1": ">", "value1": "2023-01-01"} } result = sharepoint_list.make_filter_for_api(filters) expected_result = "date_column gt datetime'2023-01-01T00:00:00' " assert result == expected_result + def test_multiple_filters_api(sharepoint_list): filters = { - 'int_column': {'dtype': 'int', 'operator1': '>=', 'value1': 10, 'operator2': '<', 'value2': 20}, - 'str_column': {'dtype': 'str', 'operator1': '==', 'value1': 'example'} + "int_column": { + "dtype": "int", + "operator1": ">=", + "value1": 10, + "operator2": "<", + "value2": 20, + }, + "str_column": {"dtype": "str", "operator1": "==", "value1": "example"}, } result = sharepoint_list.make_filter_for_api(filters) expected_result = "int_column ge '10'int_column lt '20'str_column eq 'example'" assert result == expected_result + def test_single_df_filter(sharepoint_list): - filters = { - 'column1': {'operator1': '>', 'value1': 10} - } + filters = {"column1": {"operator1": ">", "value1": 10}} result = sharepoint_list.make_filter_for_df(filters) expected_result = "df.loc[(df.column1 > '10')]" assert result == expected_result + def test_multiple_df_filters(sharepoint_list): filters = { - 'column1': {'operator1': '>', 'value1': 10, 'filters_conjuction': '&'}, - 'column2': {'operator1': '<', 'value1': 20} + "column1": {"operator1": ">", "value1": 10, "filters_conjuction": "&"}, + "column2": {"operator1": "<", "value1": 20}, } result = sharepoint_list.make_filter_for_df(filters) expected_result = "df.loc[(df.column1 > '10')&(df.column2 < '20')]" - assert result == expected_result \ No newline at end of file + assert result == expected_result diff --git a/tests/unit/test_task_utils.py b/tests/unit/test_task_utils.py index e77c24fdd..969b699a4 100644 --- a/tests/unit/test_task_utils.py +++ b/tests/unit/test_task_utils.py @@ -19,8 +19,8 @@ df_to_parquet, dtypes_to_json_task, union_dfs_task, - write_to_json, validate_df, + write_to_json, ) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index a94eaff9f..777617244 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -6,9 +6,9 @@ from viadot.signals import SKIP from viadot.utils import ( + add_viadot_metadata_columns, check_if_empty_file, gen_bulk_insert_query_from_df, - add_viadot_metadata_columns, ) EMPTY_CSV_PATH = "empty.csv" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index ed6b5a004..2f30c04d8 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -11,7 +11,7 @@ from .genesys_to_adls import GenesysToADLS from .outlook_to_adls import OutlookToADLS from .salesforce_to_adls import SalesforceToADLS -from .sharepoint_to_adls import SharepointToADLS, SharepointListToADLS +from .sharepoint_to_adls import SharepointListToADLS, SharepointToADLS from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL @@ -46,4 +46,4 @@ from .sql_server_to_parquet import SQLServerToParquet from .sql_server_transform import SQLServerTransform from .transform_and_catalog import TransformAndCatalogToLuma -from .vid_club_to_adls import VidClubToADLS \ No newline at end of file +from .vid_club_to_adls import VidClubToADLS diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index c234b4a88..0d2deb557 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -17,8 +17,7 @@ validate_df, ) from viadot.tasks import AzureDataLakeUpload -from viadot.tasks.sharepoint import SharepointToDF, SharepointListToDF - +from viadot.tasks.sharepoint import SharepointListToDF, SharepointToDF logger = logging.get_logger() @@ -181,16 +180,15 @@ def slugify(name): return name.replace(" ", "_").lower() - class SharepointListToADLS(Flow): def __init__( self, name: str, - list_title: str = None, + list_title: str = None, site_url: str = None, required_fields: List[str] = None, field_property: str = "Title", - filters: dict = None, + filters: dict = None, row_count: int = 5000, sp_cert_credentials_secret: str = None, vault_name: str = None, @@ -202,7 +200,7 @@ def __init__( output_file_extension: str = ".parquet", validate_df_dict: dict = None, *args: List[any], - **kwargs: Dict[str, Any], + **kwargs: Dict[str, Any], ): """ Run Flow SharepointListToADLS. @@ -233,7 +231,7 @@ def __init__( 'filters_conjuction':'&', # conjuction filters allowed only when 2 columns passed } , - 'Column_name_2' : + 'Column_name_2' : { 'dtype': 'str', 'value1':'NM-PL', @@ -247,10 +245,10 @@ def __init__( adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. adls_file_name (str, optional): Name of file in ADLS. Defaults to None. adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with - ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. - + Returns: .parquet file inside ADLS. """ @@ -267,7 +265,6 @@ def __init__( self.row_count = row_count self.validate_df_dict = validate_df_dict - # AzureDataLakeUpload self.adls_dir_path = adls_dir_path self.adls_file_name = adls_file_name @@ -296,26 +293,25 @@ def __init__( adls_dir_path, "schema", self.now + ".json" ) - super().__init__( name=name, - *args, + *args, **kwargs, - ) + ) self.gen_flow() def gen_flow(self) -> Flow: - s = SharepointListToDF( - path = self.path, - list_title = self.list_title, - site_url = self.site_url, - required_fields = self.required_fields, - field_property = self.field_property, - filters = self.filters, - row_count = self.row_count, - credentials_secret=self.sp_cert_credentials_secret, - ) + s = SharepointListToDF( + path=self.path, + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + field_property=self.field_property, + filters=self.filters, + row_count=self.row_count, + credentials_secret=self.sp_cert_credentials_secret, + ) df = s.run() if self.validate_df_dict: @@ -327,7 +323,7 @@ def gen_flow(self) -> Flow: df_mapped = df_map_mixed_dtypes_for_parquet.bind( df_with_metadata, dtypes_dict, flow=self ) - + df_to_file = df_to_parquet.bind( df=df_mapped, path=self.path, @@ -347,7 +343,6 @@ def gen_flow(self) -> Flow: dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self ) - json_to_adls_task = AzureDataLakeUpload() json_to_adls_task.bind( from_path=self.local_json_path, @@ -367,4 +362,4 @@ def gen_flow(self) -> Flow: @staticmethod def slugify(name): - return name.replace(" ", "_").lower() \ No newline at end of file + return name.replace(" ", "_").lower() diff --git a/viadot/flows/supermetrics_to_adls.py b/viadot/flows/supermetrics_to_adls.py index 5a104cff7..23aadee57 100644 --- a/viadot/flows/supermetrics_to_adls.py +++ b/viadot/flows/supermetrics_to_adls.py @@ -18,8 +18,8 @@ dtypes_to_json_task, union_dfs_task, update_dtypes_dict, - write_to_json, validate_df, + write_to_json, ) from viadot.tasks import ( AzureDataLakeUpload, diff --git a/viadot/flows/transform_and_catalog.py b/viadot/flows/transform_and_catalog.py index 1de5c4430..08ac6b895 100644 --- a/viadot/flows/transform_and_catalog.py +++ b/viadot/flows/transform_and_catalog.py @@ -1,13 +1,13 @@ import os -from pathlib import Path import shutil +from pathlib import Path from typing import Dict, List, Union from prefect import Flow, task from prefect.tasks.shell import ShellTask from prefect.triggers import any_successful -from viadot.tasks import CloneRepo, AzureKeyVaultSecret, LumaIngest +from viadot.tasks import AzureKeyVaultSecret, CloneRepo, LumaIngest @task(trigger=any_successful) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 8308c78c9..c0d96abe2 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -33,4 +33,4 @@ # APIS from .uk_carbon_intensity import UKCarbonIntensity -from .vid_club import VidClub \ No newline at end of file +from .vid_club import VidClub diff --git a/viadot/sources/bigquery.py b/viadot/sources/bigquery.py index 1be69e866..32d1dac2c 100644 --- a/viadot/sources/bigquery.py +++ b/viadot/sources/bigquery.py @@ -6,8 +6,8 @@ from ..config import local_config from ..exceptions import CredentialError, DBDataAccessError -from .base import Source from ..utils import add_viadot_metadata_columns +from .base import Source class BigQuery(Source): diff --git a/viadot/sources/mindful.py b/viadot/sources/mindful.py index 254eecb9d..2698adb15 100644 --- a/viadot/sources/mindful.py +++ b/viadot/sources/mindful.py @@ -1,12 +1,12 @@ import os -from io import StringIO from datetime import datetime, timedelta +from io import StringIO from typing import Any, Dict, Literal, Tuple import pandas as pd import prefect -from requests.models import Response from requests.auth import HTTPBasicAuth +from requests.models import Response from viadot.exceptions import APIError from viadot.sources.base import Source diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 028499c94..6e935eee2 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -1,20 +1,20 @@ -from ..config import local_config -from ..exceptions import CredentialError -from .base import Source -from viadot.utils import get_nested_dict - -from typing import Any, Dict, List -from fnmatch import fnmatch -from datetime import datetime from copy import deepcopy -import pandas as pd +from datetime import datetime +from fnmatch import fnmatch +from typing import Any, Dict, List +import pandas as pd import sharepy from office365.runtime.auth.authentication_context import AuthenticationContext -from office365.sharepoint.client_context import ClientContext from office365.runtime.client_request_exception import ClientRequestException +from office365.sharepoint.client_context import ClientContext from prefect.utilities import logging +from viadot.utils import get_nested_dict + +from ..config import local_config +from ..exceptions import CredentialError +from .base import Source logger = logging.get_logger() @@ -82,15 +82,14 @@ def download_file( ) - class SharepointList(Source): """ A Sharepoint_List class to connect and download data from sharpoint lists. Args: credentials (dict): Credentials should include: - - "tenant" - - "client_id" + - "tenant" + - "client_id" - "scopes" - "thumbprint" - "private_key" @@ -106,46 +105,46 @@ def __init__( credentials = credentials or DEFAULT_CREDENTIALS if credentials is None: raise CredentialError("Credentials not found.") - - super().__init__(*args, credentials=credentials, **kwargs) + super().__init__(*args, credentials=credentials, **kwargs) def get_connection( - self, - site_url: str = None, - ): - + self, + site_url: str = None, + ): + # Connecting into Sharepoint with AuthenticationContext try: auth_context = AuthenticationContext(site_url) - auth_context.with_client_certificate(tenant=self.credentials["TENANT"], - client_id=self.credentials["CLIENT_ID"], - scopes=[self.credentials["SCOPES"]], - thumbprint=self.credentials["THUMBPRINT"], - private_key=self.credentials["PRIVATE_KEY"]) - + auth_context.with_client_certificate( + tenant=self.credentials["TENANT"], + client_id=self.credentials["CLIENT_ID"], + scopes=[self.credentials["SCOPES"]], + thumbprint=self.credentials["THUMBPRINT"], + private_key=self.credentials["PRIVATE_KEY"], + ) + self.ctx = ClientContext(site_url, auth_context) logger.info("Successfully connect to Sharepoint Lists") except Exception as ex: logger.info(f"Error at ClientContext or authentication level: {ex}") - return self.ctx - + return self.ctx # Function for extracting list items from search fields def _unpack_fields( - self, - list_item, - selected_fields: dict = None, - ): - + self, + list_item, + selected_fields: dict = None, + ): + # Creating the body of dictionary new_dict = dict() # For loop scanning the propertys of searching fields item_values_dict = list_item.properties - for field,val in item_values_dict.items(): + for field, val in item_values_dict.items(): nested_dict = get_nested_dict(val) # Check if the dictionary is nested if nested_dict != None: @@ -158,18 +157,17 @@ def _unpack_fields( raise ValueError else: new_dict[field] = val - - return new_dict - + + return new_dict def get_fields( - self, - list_title: str = None, - site_url: str = None, - required_fields: List[str] = None, - ): - - ctx = self.get_connection(site_url = site_url) + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + ): + + ctx = self.get_connection(site_url=site_url) # Get list of lists object by List Title self.list_object = ctx.web.lists.get_by_title(list_title) @@ -181,219 +179,278 @@ def get_fields( ctx.execute_query() return list_fields_all - + else: - list_fields_required = [list_fields_all.get_by_internal_name_or_title(field).get() - for field in required_fields] + list_fields_required = [ + list_fields_all.get_by_internal_name_or_title(field).get() + for field in required_fields + ] ctx.execute_batch() - + return list_fields_required - - + def select_expandable_user_fields( - self, - list_title: str = None, - site_url: str = None, - required_fields: List[str] = None, - field_property: str = "Title", - ): + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + ): """ Method to expand fields and get more informations. - field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. - -> more properties can be discovered by getting list.item.properties. + field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. + -> more properties can be discovered by getting list.item.properties. Default to "Title" """ - list_fields = self.get_fields(list_title=list_title, - site_url=site_url, - required_fields=required_fields) - + list_fields = self.get_fields( + list_title=list_title, site_url=site_url, required_fields=required_fields + ) + # Finding the "selected" fields - fields_to_select = [field.properties['InternalName'] +f"/{field_property}" - if fnmatch(field.properties['TypeAsString'],"User*") - else field.properties['InternalName'] for field in list_fields] + fields_to_select = [ + field.properties["InternalName"] + f"/{field_property}" + if fnmatch(field.properties["TypeAsString"], "User*") + else field.properties["InternalName"] + for field in list_fields + ] # Finding the "expanded" fields - fields_to_expand = [field.properties['InternalName'] for field in list_fields - if fnmatch(field.properties['TypeAsString'],f"User*")] - + fields_to_expand = [ + field.properties["InternalName"] + for field in list_fields + if fnmatch(field.properties["TypeAsString"], f"User*") + ] + # Creating the body of the function output - selected_fields = {"FieldInternalNames" : fields_to_select, "FieldToExpand": fields_to_expand, "FieldProperty": field_property} + selected_fields = { + "FieldInternalNames": fields_to_select, + "FieldToExpand": fields_to_expand, + "FieldProperty": field_property, + } return selected_fields - def check_filters( - self, - filters: dict = None, - ) -> bool: + self, + filters: dict = None, + ) -> bool: """ Function to check if filters dict is valid. example1: if operator2 is present value2 must be in place as well example2: if dtype is not on allowed list it will throw an error """ - allowed_dtypes = ['datetime','date','bool','int', 'float', 'complex', 'str'] - allowed_conjuction = ['&','|'] - allowed_operators = ['<','>','<=','>=','==','!='] - + allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] + allowed_conjuction = ["&", "|"] + allowed_operators = ["<", ">", "<=", ">=", "==", "!="] + for parameters in filters.values(): - if parameters.get('dtype') not in allowed_dtypes: - raise ValueError(f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}.") - if parameters.get('operator1'): - if parameters.get('operator1') not in allowed_operators: - raise ValueError(f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}.") - if not parameters.get('value1'): - raise ValueError('Value for operator1 is missing!') - elif not parameters.get('operator1') : raise ValueError('Operator1 is missing!') - if not parameters.get('operator2') and parameters.get('operators_conjuction') is not None: - raise ValueError(f"Operator conjuction allowed only with more than one filter operator!") - if parameters.get('operator2'): - if parameters.get('operator2') not in allowed_operators: - raise ValueError(f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}.") - if not parameters.get('value2'): - raise ValueError('Value for operator2 is missing!') - if not parameters.get('operators_conjuction'): - raise ValueError(f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty.") - if parameters.get('operators_conjuction') not in allowed_conjuction: - raise ValueError(f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}.") - if parameters.get('filters_conjuction'): - if len(filters.keys()) == 1 and parameters.get('filters_conjuction') is not None: - raise ValueError(f"Filters conjuction allowed only with more than one filter column!") - if parameters.get('filters_conjuction') not in allowed_conjuction: - raise ValueError(f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}.") + if parameters.get("dtype") not in allowed_dtypes: + raise ValueError( + f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}." + ) + if parameters.get("operator1"): + if parameters.get("operator1") not in allowed_operators: + raise ValueError( + f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}." + ) + if not parameters.get("value1"): + raise ValueError("Value for operator1 is missing!") + elif not parameters.get("operator1"): + raise ValueError("Operator1 is missing!") + if ( + not parameters.get("operator2") + and parameters.get("operators_conjuction") is not None + ): + raise ValueError( + f"Operator conjuction allowed only with more than one filter operator!" + ) + if parameters.get("operator2"): + if parameters.get("operator2") not in allowed_operators: + raise ValueError( + f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}." + ) + if not parameters.get("value2"): + raise ValueError("Value for operator2 is missing!") + if not parameters.get("operators_conjuction"): + raise ValueError( + f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty." + ) + if parameters.get("operators_conjuction") not in allowed_conjuction: + raise ValueError( + f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}." + ) + if parameters.get("filters_conjuction"): + if ( + len(filters.keys()) == 1 + and parameters.get("filters_conjuction") is not None + ): + raise ValueError( + f"Filters conjuction allowed only with more than one filter column!" + ) + if parameters.get("filters_conjuction") not in allowed_conjuction: + raise ValueError( + f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}." + ) return True - def operators_mapping( - self, - filters: dict = None, - ) -> dict: - """ + self, + filters: dict = None, + ) -> dict: + """ Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. - + Args: - filters (dict): A dictionar which contains operators. - + filters (dict): A dictionar which contains operators. + Returns: New modified dict. """ filters_dict = deepcopy(filters) operators = { - '<' : 'lt', - '>' : 'gt', - '<=' : 'le', - '>=' : 'ge', - '==' : 'eq', - '!=' : 'ne' - } - logical_op = { - '&' : 'and', - '|' : 'or' + "<": "lt", + ">": "gt", + "<=": "le", + ">=": "ge", + "==": "eq", + "!=": "ne", } - + logical_op = {"&": "and", "|": "or"} + for parameters in filters_dict.values(): - if parameters.get('operator1'): - operator1_to_change = parameters.get('operator1') + if parameters.get("operator1"): + operator1_to_change = parameters.get("operator1") if operator1_to_change in operators.keys(): - parameters['operator1'] = operators[operator1_to_change] - else: raise ValueError(f'This comparison operator: {operator1_to_change} is not allowed. Please read the function documentation for details!') - if parameters.get('operator2'): - operator2_to_change = parameters.get('operator2') + parameters["operator1"] = operators[operator1_to_change] + else: + raise ValueError( + f"This comparison operator: {operator1_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("operator2"): + operator2_to_change = parameters.get("operator2") if operator2_to_change in operators.keys(): - parameters['operator2'] = operators[operator2_to_change] - else: raise ValueError(f'This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!') - if parameters.get('operators_conjuction'): - logical_op_to_change = parameters.get('operators_conjuction') + parameters["operator2"] = operators[operator2_to_change] + else: + raise ValueError( + f"This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("operators_conjuction"): + logical_op_to_change = parameters.get("operators_conjuction") if logical_op_to_change in logical_op.keys(): - parameters['operators_conjuction'] = logical_op[logical_op_to_change] - else: raise ValueError(f'This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!') - if parameters.get('filters_conjuction'): - logical_fl_to_change = parameters.get('filters_conjuction') + parameters["operators_conjuction"] = logical_op[ + logical_op_to_change + ] + else: + raise ValueError( + f"This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("filters_conjuction"): + logical_fl_to_change = parameters.get("filters_conjuction") if logical_fl_to_change in logical_op.keys(): - parameters['filters_conjuction'] = logical_op[logical_fl_to_change] - else: raise ValueError(f'This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!') - - return filters_dict + parameters["filters_conjuction"] = logical_op[logical_fl_to_change] + else: + raise ValueError( + f"This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" + ) + return filters_dict - def make_filter_for_api( - self, - filters: dict - ) -> 'str': + def make_filter_for_api(self, filters: dict) -> "str": """ Function changing type of operators to match MS API style as 'str' passing to URL call. Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionar which contains operators. Returns: Output as string to pass as filter parameter to API. """ - - filter_text = '' + + filter_text = "" filters_mod = self.operators_mapping(filters) for column, parameters in filters_mod.items(): - if parameters.get('dtype') in ['datetime','date']: - from_date1 = datetime.strptime(parameters.get('value1'),'%Y-%m-%d').isoformat() - filter_text = filter_text + f"{column} {parameters.get('operator1')} datetime'{from_date1}' " - if parameters.get('operator2'): - from_date2 = datetime.strptime(parameters.get('value2'),'%Y-%m-%d').isoformat() - filter_text = filter_text + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " - elif parameters.get('dtype') not in ['datetime','date']: - filter_text = filter_text + f"{column} {parameters.get('operator1')} '{parameters.get('value1')}'" - if parameters.get('operator2'): - filter_text = filter_text + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" - if parameters.get('filters_conjuction'): + if parameters.get("dtype") in ["datetime", "date"]: + from_date1 = datetime.strptime( + parameters.get("value1"), "%Y-%m-%d" + ).isoformat() + filter_text = ( + filter_text + + f"{column} {parameters.get('operator1')} datetime'{from_date1}' " + ) + if parameters.get("operator2"): + from_date2 = datetime.strptime( + parameters.get("value2"), "%Y-%m-%d" + ).isoformat() + filter_text = ( + filter_text + + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " + ) + elif parameters.get("dtype") not in ["datetime", "date"]: + filter_text = ( + filter_text + + f"{column} {parameters.get('operator1')} '{parameters.get('value1')}'" + ) + if parameters.get("operator2"): + filter_text = ( + filter_text + + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" + ) + if parameters.get("filters_conjuction"): filter_text = filter_text + f"{parameters.get('filters_conjuction')} " return filter_text - def make_filter_for_df( - self, - filters: dict = None, - ) -> 'str': + self, + filters: dict = None, + ) -> "str": """ Function changing dict operators into pandas DataFrame filters. Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionar which contains operators. Returns: Output as string to pass as filter to DataFrame. """ - filter_in_df = 'df.loc[' + filter_in_df = "df.loc[" for column, parameters in filters.items(): - filter_in_df = filter_in_df + f"(df.{column} {parameters.get('operator1', '')} '{parameters.get('value1', '')}'" + filter_in_df = ( + filter_in_df + + f"(df.{column} {parameters.get('operator1', '')} '{parameters.get('value1', '')}'" + ) - if parameters.get('operator2'): - filter_in_df = filter_in_df + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + if parameters.get("operator2"): + filter_in_df = ( + filter_in_df + + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + ) - if parameters.get('filters_conjuction'): - filter_in_df = filter_in_df + ')' + parameters.get('filters_conjuction') + if parameters.get("filters_conjuction"): + filter_in_df = filter_in_df + ")" + parameters.get("filters_conjuction") - else: - filter_in_df = filter_in_df + ')' + else: + filter_in_df = filter_in_df + ")" - filter_in_df = filter_in_df + ']' + filter_in_df = filter_in_df + "]" return filter_in_df - def list_item_to_df( - self, - list_title: str = None, - site_url: str = None, - required_fields: List[str] = None, - field_property: str = "Title", - filters: dict = None, - row_count: int = 5000, - ): + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + ): """ Method to extract data from Sharepoint List into DataFrame. @@ -422,7 +479,7 @@ def list_item_to_df( 'filters_conjuction':'&', } , - 'Column_name_2' : + 'Column_name_2' : { 'dtype': 'str', 'value1':'NM-PL', @@ -430,7 +487,7 @@ def list_item_to_df( }, } row_count (int): Number of downloaded rows in single request. Default to 5000. - + Returns: pd.DataFrame """ @@ -441,42 +498,57 @@ def list_item_to_df( # checking that the filter parameters are included in the desired field parameters for key in filters: if key not in required_fields: - raise AttributeError(f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!") - + raise AttributeError( + f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!" + ) + # changing the body of the filter for MS API call filter_text = self.make_filter_for_api(filters) download_all = False # extracting requeird_fields SP_List objects - selected_fields = self.select_expandable_user_fields(list_title=list_title, - site_url=site_url, - required_fields=required_fields, - field_property=field_property) + selected_fields = self.select_expandable_user_fields( + list_title=list_title, + site_url=site_url, + required_fields=required_fields, + field_property=field_property, + ) try: # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. if filters is None: raise ValueError("There is no filter. Starting extraxction all data") else: - list_items= self.list_object.items.filter(filter_text).select(selected_fields["FieldInternalNames"]).top(row_count).expand(selected_fields["FieldToExpand"]) + list_items = ( + self.list_object.items.filter(filter_text) + .select(selected_fields["FieldInternalNames"]) + .top(row_count) + .expand(selected_fields["FieldToExpand"]) + ) self.ctx.load(list_items) self.ctx.execute_query() except (ClientRequestException, ValueError) as e: # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFreame level. logger.info(f"Exception SPQueryThrottledException occurred: {e}") - list_items = self.list_object.items.get_all(row_count,log_of_progress).select(selected_fields["FieldInternalNames"]).expand(selected_fields["FieldToExpand"]) + list_items = ( + self.list_object.items.get_all(row_count, log_of_progress) + .select(selected_fields["FieldInternalNames"]) + .expand(selected_fields["FieldToExpand"]) + ) self.ctx.load(list_items) self.ctx.execute_query() download_all = True - df = pd.DataFrame([self._unpack_fields(row_item,selected_fields) for row_item in list_items]) + df = pd.DataFrame( + [self._unpack_fields(row_item, selected_fields) for row_item in list_items] + ) - if download_all == True and filters is not None: + if download_all == True and filters is not None: # Filter for desired range of created date and for factory Namyslow PL self.logger.info("Filtering df with all data output") filter_for_df = self.make_filter_for_df(filters) df = eval(filter_for_df) - return df \ No newline at end of file + return df diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 6173e2994..6a532f932 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -1,8 +1,8 @@ import copy import json import os -import shutil import re +import shutil from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 1e67d96a6..35543045a 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .outlook import OutlookToDF from .prefect_date_range import GetFlowNewDateRange from .salesforce import SalesforceBulkUpsert, SalesforceToDF, SalesforceUpsert -from .sharepoint import SharepointToDF, SharepointListToDF +from .sharepoint import SharepointListToDF, SharepointToDF from .sqlite import SQLiteInsert, SQLiteQuery, SQLiteSQLtoDF from .supermetrics import SupermetricsToCSV, SupermetricsToDF @@ -50,11 +50,11 @@ from .duckdb import DuckDBCreateTableFromParquet, DuckDBQuery, DuckDBToDF from .epicor import EpicorOrdersToDF from .eurostat import EurostatToDF +from .git import CloneRepo from .hubspot import HubspotToDF +from .luma import LumaIngest from .mediatool import MediatoolToDF from .mindful import MindfulToCSV from .sftp import SftpList, SftpToDF from .sql_server import SQLServerCreateTable, SQLServerQuery, SQLServerToDF from .vid_club import VidClubToDF -from .git import CloneRepo -from .luma import LumaIngest \ No newline at end of file diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index de47ddebf..dda6c4dde 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -10,10 +10,10 @@ from prefect.engine import signals from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs -from viadot.task_utils import * from viadot.exceptions import APIError from viadot.sources import Genesys +from viadot.task_utils import * logger = logging.get_logger() diff --git a/viadot/tasks/luma.py b/viadot/tasks/luma.py index 5b78ebc27..11eb91e45 100644 --- a/viadot/tasks/luma.py +++ b/viadot/tasks/luma.py @@ -1,5 +1,7 @@ import json + from prefect.tasks.shell import ShellTask + from .azure_key_vault import AzureKeyVaultSecret diff --git a/viadot/tasks/sap_bw.py b/viadot/tasks/sap_bw.py index acc92c246..0d8d7b2e3 100644 --- a/viadot/tasks/sap_bw.py +++ b/viadot/tasks/sap_bw.py @@ -1,12 +1,12 @@ import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret -from viadot.tasks import AzureKeyVaultSecret from prefect.utilities import logging from viadot.exceptions import ValidationError from viadot.sources import SAPBW from viadot.task_utils import * +from viadot.tasks import AzureKeyVaultSecret logger = logging.get_logger() diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index cb7f30a19..c4d670617 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -1,10 +1,10 @@ -from typing import List -import pandas as pd import copy import json import os import re +from typing import List +import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging @@ -12,8 +12,8 @@ from ..exceptions import ValidationError from ..sources import Sharepoint, SharepointList -from .azure_key_vault import AzureKeyVaultSecret from ..utils import add_viadot_metadata_columns +from .azure_key_vault import AzureKeyVaultSecret logger = logging.get_logger() @@ -233,7 +233,6 @@ def run( return df - class SharepointListToDF(Task): """ Task to extract data from Sharepoint List into DataFrame. @@ -263,7 +262,7 @@ class SharepointListToDF(Task): 'filters_conjuction':'&', } , - 'Column_name_2' : + 'Column_name_2' : { 'dtype': 'str', 'value1':'NM-PL', @@ -279,18 +278,18 @@ class SharepointListToDF(Task): def __init__( self, path: str = None, - list_title: str = None, + list_title: str = None, site_url: str = None, required_fields: List[str] = None, field_property: str = "Title", - filters: dict = None, + filters: dict = None, row_count: int = 5000, credentials_secret: str = None, vault_name: str = None, *args, **kwargs, ): - + self.path = path self.list_title = list_title self.site_url = site_url @@ -300,7 +299,7 @@ def __init__( self.row_count = row_count self.vault_name = vault_name self.credentials_secret = credentials_secret - + if not credentials_secret: # Attempt to read a default for the service principal secret name try: @@ -310,50 +309,44 @@ def __init__( if credentials_secret: credentials_str = AzureKeyVaultSecret( - secret = self.credentials_secret, vault_name=self.vault_name + secret=self.credentials_secret, vault_name=self.vault_name ).run() self.credentials = json.loads(credentials_str) - super().__init__( *args, **kwargs, ) - def __call__(self): """Download Sharepoint_List data to a .parquet file""" super().__call__(self) - - def _convert_camel_case_to_words( - self, - input_str: str) -> str: - + def _convert_camel_case_to_words(self, input_str: str) -> str: + self.input_str = input_str - words = re.findall(r'[A-Z][a-z]*|[0-9]+', self.input_str) - converted = ' '.join(words) + words = re.findall(r"[A-Z][a-z]*|[0-9]+", self.input_str) + converted = " ".join(words) + + return converted - return converted - - def change_column_name( self, df: pd.DataFrame = None, ): s = SharepointList() list_fields = s.get_fields( - list_title= self.list_title, - site_url= self.site_url, - required_fields= self.required_fields, - ) - + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + ) + self.logger.info("Changing columns names") - column_names_correct = [field.properties['Title'] for field in list_fields] - column_names_code = [field.properties['InternalName'] for field in list_fields] - dictionary = dict(zip(column_names_code, column_names_correct)) - + column_names_correct = [field.properties["Title"] for field in list_fields] + column_names_code = [field.properties["InternalName"] for field in list_fields] + dictionary = dict(zip(column_names_code, column_names_correct)) + # If duplicates in names from "Title" take "InternalName" value_count = {} duplicates = [] @@ -365,19 +358,18 @@ def change_column_name( duplicates.append(key) else: value_count[value] = key - + for key in duplicates: dictionary[key] = self._convert_camel_case_to_words(key) - + # Rename columns names inside DataFrame df = df.rename(columns=dictionary) return df - def run( self, - ) -> None: + ) -> None: """ Run Task SharepointListToDF. @@ -385,17 +377,19 @@ def run( pd.DataFrame """ - s = SharepointList(credentials=self.credentials,) + s = SharepointList( + credentials=self.credentials, + ) df_raw = s.list_item_to_df( - list_title = self.list_title, - site_url = self.site_url, - required_fields = self.required_fields, - field_property = self.field_property, - filters = self.filters, - row_count = self.row_count, - ) - + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + field_property=self.field_property, + filters=self.filters, + row_count=self.row_count, + ) + df = self.change_column_name(df=df_raw) self.logger.info("Successfully changed structure of the DataFrame") - return df \ No newline at end of file + return df diff --git a/viadot/utils.py b/viadot/utils.py index 49fa33e09..d05cfdd95 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -459,4 +459,4 @@ def get_nested_dict(d): else: return d else: - return None \ No newline at end of file + return None