diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a0be2f82..2ede484c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Added `validate_df` task to task_utils. +- Added `SharepointList` source class. +- Added `SharepointListToDF` task class. +- Added `SharepointListToADLS` flow class. +- Added tests for `SharepointList`. +- Added `get_nested_dict` to untils.py. +- Added `validate_df` task to `SharepointToADLS` class. + ### Fixed ### Changed @@ -618,4 +625,4 @@ specified in the `SUPERMETRICS_DEFAULT_USER` secret - Moved from poetry to pip ### Fixed -- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part +- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part \ No newline at end of file diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index c784fa682..b56d10182 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -1,6 +1,8 @@ import os +import re import pandas as pd +from copy import deepcopy import pytest from prefect.tasks.secrets import PrefectSecret @@ -9,6 +11,7 @@ from viadot.sources import Sharepoint from viadot.task_utils import df_get_data_types_task from viadot.tasks.sharepoint import SharepointToDF +from viadot.sources import SharepointList def get_url() -> str: @@ -18,7 +21,7 @@ def get_url() -> str: Returns: str: File URL. """ - return local_config["SHAREPOINT"].get("url") + return local_config["SHAREPOINT"].get("file_url") @pytest.fixture(scope="session") @@ -163,3 +166,150 @@ def test_get_data_types(file_name): dtypes = dtypes_map.values() assert "String" in dtypes + + +# testing get_connection function passing invalid credentials and raises AuthenticationContext error. +def test_get_connection(): + site_url = "https://velux.sharepoint.com/" + credentials ={ + "SHAREPOINT_CERT": + {"TENANT": "xxx", + "CLIENT_ID": "123", + "SCOPES": "https://velux.sharepoint.com/", + "THUMBPRINT": "xyz", + "PRIVATE_KEY": "private"} + } + + spl = SharepointList(credentials=credentials) + with pytest.raises(AttributeError, match="'SharepointList' object has no attribute 'ctx'"): + spl.get_connection(site_url=site_url) + + +@pytest.fixture(scope="session") +def sharepoint_list(): + """ + Fixture for creating a Sharepoint class instance. + The class instance can be used within a test functions to interact with Sharepoint. + """ + spl = SharepointList() + yield spl + + +def test_valid_filters(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '<', 'value1': 10}, + 'filter2': {'dtype': 'str', 'operator1': '==', 'value1': 'value'}, + } + result = sharepoint_list.check_filters(filters) + assert result is True + +def test_invalid_dtype(sharepoint_list): + filters = { + 'filter1': {'dtype': 'list', 'operator1': '>', 'value1': 10}, + } + with pytest.raises(ValueError, match="dtype not allowed!"): + sharepoint_list.check_filters(filters) + +def test_missing_operator1(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'value1': 10}, + } + with pytest.raises(ValueError, match="Operator1 is missing!"): + sharepoint_list.check_filters(filters) + +def test_invalid_operator1(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '*', 'value1': 10}, + } + with pytest.raises(ValueError, match="Operator type not allowed!"): + sharepoint_list.check_filters(filters) + +def test_missing_value1(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': None}, + } + with pytest.raises(ValueError, match="Value for operator1 is missing!"): + sharepoint_list.check_filters(filters) + +def test_missing_operators_conjuction(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'operator2': '<', 'value2': 20}, + } + with pytest.raises(ValueError, match="Operators for conjuction is missing!"): + sharepoint_list.check_filters(filters) + +def test_invalid_operators_conjuction(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'operator2': '<', 'value2': 20, 'operators_conjuction': '!'}, + } + with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): + sharepoint_list.check_filters(filters) + +def test_invalid_filters_conjuction(sharepoint_list): + filters = { + 'filter1': {'dtype': 'int', 'operator1': '>', 'value1': 10, 'filters_conjuction': '!'}, + } + with pytest.raises(ValueError, match="Filters operators for conjuction not allowed!"): + sharepoint_list.check_filters(filters) + +def test_valid_mapping(sharepoint_list): + filters = { + 'filter1': {'operator1': '>', 'operator2': '<=', 'operators_conjuction': '&', 'filters_conjuction': '|'}, + 'filter2': {'operator1': '==', 'operator2': '!=', 'operators_conjuction': '|'}, + } + expected_result = { + 'filter1': {'operator1': 'gt', 'operator2': 'le', 'operators_conjuction': 'and', 'filters_conjuction': 'or'}, + 'filter2': {'operator1': 'eq', 'operator2': 'ne', 'operators_conjuction': 'or'}, + } + result = sharepoint_list.operators_mapping(deepcopy(filters)) + assert result == expected_result + +def test_invalid_comparison_operator(sharepoint_list): + filters = { + 'filter1': {'operator1': '*', 'operator2': '<=', 'operators_conjuction': '&', 'filters_conjuction': '|'}, + } + error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + +def test_invalid_logical_operator(sharepoint_list): + filters = { + 'filter1': {'operator1': '>', 'operator2': '<=', 'operators_conjuction': '!', 'filters_conjuction': '|'}, + } + error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + +def test_single_filter_datetime_api(sharepoint_list): + filters = { + 'date_column': {'dtype': 'datetime', 'operator1': '>', 'value1': '2023-01-01'} + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "date_column gt datetime'2023-01-01T00:00:00' " + assert result == expected_result + +def test_multiple_filters_api(sharepoint_list): + filters = { + 'int_column': {'dtype': 'int', 'operator1': '>=', 'value1': 10, 'operator2': '<', 'value2': 20}, + 'str_column': {'dtype': 'str', 'operator1': '==', 'value1': 'example'} + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "int_column ge '10'int_column lt '20'str_column eq 'example'" + assert result == expected_result + +def test_single_df_filter(sharepoint_list): + filters = { + 'column1': {'operator1': '>', 'value1': 10} + } + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')]" + assert result == expected_result + +def test_multiple_df_filters(sharepoint_list): + filters = { + 'column1': {'operator1': '>', 'value1': 10, 'filters_conjuction': '&'}, + 'column2': {'operator1': '<', 'value1': 20} + } + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')&(df.column2 < '20')]" + assert result == expected_result \ No newline at end of file diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index de2f618ab..ed6b5a004 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -11,7 +11,7 @@ from .genesys_to_adls import GenesysToADLS from .outlook_to_adls import OutlookToADLS from .salesforce_to_adls import SalesforceToADLS -from .sharepoint_to_adls import SharepointToADLS +from .sharepoint_to_adls import SharepointToADLS, SharepointListToADLS from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL @@ -46,4 +46,4 @@ from .sql_server_to_parquet import SQLServerToParquet from .sql_server_transform import SQLServerTransform from .transform_and_catalog import TransformAndCatalogToLuma -from .vid_club_to_adls import VidClubToADLS +from .vid_club_to_adls import VidClubToADLS \ No newline at end of file diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index df5562221..c234b4a88 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -3,12 +3,10 @@ from typing import Any, Dict, List import pendulum -from prefect import Flow, task +from prefect import Flow from prefect.backend import set_key_value from prefect.utilities import logging -logger = logging.get_logger() - from viadot.task_utils import ( add_ingestion_metadata_task, df_get_data_types_task, @@ -16,9 +14,13 @@ df_to_csv, df_to_parquet, dtypes_to_json_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload -from viadot.tasks.sharepoint import SharepointToDF +from viadot.tasks.sharepoint import SharepointToDF, SharepointListToDF + + +logger = logging.get_logger() class SharepointToADLS(Flow): @@ -38,6 +40,7 @@ def __init__( overwrite_adls: bool = False, if_empty: str = "warn", if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -62,6 +65,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_empty (str, optional): What to do if query returns no data. Defaults to "warn". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -74,6 +79,7 @@ def __init__( self.sheet_number = sheet_number self.validate_excel_file = validate_excel_file self.timeout = timeout + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.overwrite = overwrite_adls @@ -117,6 +123,10 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) df_mapped = df_map_mixed_dtypes_for_parquet.bind( @@ -169,3 +179,192 @@ def gen_flow(self) -> Flow: @staticmethod def slugify(name): return name.replace(" ", "_").lower() + + + +class SharepointListToADLS(Flow): + def __init__( + self, + name: str, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + sp_cert_credentials_secret: str = None, + vault_name: str = None, + path: str = None, + adls_dir_path: str = None, + adls_file_name: str = None, + adls_sp_credentials_secret: str = None, + overwrite_adls: bool = True, + output_file_extension: str = ".parquet", + validate_df_dict: dict = None, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Run Flow SharepointListToADLS. + + Args: + name (str): Prefect flow name. + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', # conjuction operators allowed only when 2 values passed + 'filters_conjuction':'&', # conjuction filters allowed only when 2 columns passed + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + sp_cert_credentials_secret (str): Credentials to verify Sharepoint connection. Default to None. + vault_name (str): KeyVaultSecret name. Default to None. + path (str): Local file path. Default to None. + adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_file_name (str, optional): Name of file in ADLS. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, + CLIENT_SECRET) for the Azure Data Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + + Returns: + .parquet file inside ADLS. + """ + + # SharepointListToDF + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.sp_cert_credentials_secret = sp_cert_credentials_secret + self.vault_name = vault_name + self.row_count = row_count + self.validate_df_dict = validate_df_dict + + + # AzureDataLakeUpload + self.adls_dir_path = adls_dir_path + self.adls_file_name = adls_file_name + self.overwrite = overwrite_adls + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.output_file_extension = output_file_extension + self.now = str(pendulum.now("utc")) + if self.path is not None: + self.local_file_path = ( + self.path + self.slugify(name) + self.output_file_extension + ) + else: + self.local_file_path = self.slugify(name) + self.output_file_extension + self.local_json_path = self.slugify(name) + ".json" + self.adls_dir_path = adls_dir_path + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + + + super().__init__( + name=name, + *args, + **kwargs, + ) + + self.gen_flow() + + def gen_flow(self) -> Flow: + s = SharepointListToDF( + path = self.path, + list_title = self.list_title, + site_url = self.site_url, + required_fields = self.required_fields, + field_property = self.field_property, + filters = self.filters, + row_count = self.row_count, + credentials_secret=self.sp_cert_credentials_secret, + ) + df = s.run() + + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation.set_upstream(df, flow=self) + + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) + + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.path, + flow=self, + ) + + file_to_adls_task = AzureDataLakeUpload() + file_to_adls_task.bind( + from_path=self.path, + to_path=self.adls_dir_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_to_json_task.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) + + + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json_task.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json_task, flow=self) + + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() \ No newline at end of file diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 094cec14e..8308c78c9 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -8,7 +8,7 @@ from .outlook import Outlook from .salesforce import Salesforce from .sftp import SftpConnector -from .sharepoint import Sharepoint +from .sharepoint import Sharepoint, SharepointList from .supermetrics import Supermetrics try: @@ -33,4 +33,4 @@ # APIS from .uk_carbon_intensity import UKCarbonIntensity -from .vid_club import VidClub +from .vid_club import VidClub \ No newline at end of file diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 61eda17f2..028499c94 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -1,10 +1,26 @@ -from typing import Any, Dict - -import sharepy - from ..config import local_config from ..exceptions import CredentialError from .base import Source +from viadot.utils import get_nested_dict + +from typing import Any, Dict, List +from fnmatch import fnmatch +from datetime import datetime +from copy import deepcopy +import pandas as pd + +import sharepy +from office365.runtime.auth.authentication_context import AuthenticationContext +from office365.sharepoint.client_context import ClientContext +from office365.runtime.client_request_exception import ClientRequestException +from prefect.utilities import logging + + +logger = logging.get_logger() + +# Print out how many rows was extracted in specific iteration +def log_of_progress(items): + logger.info("Items read: {0}".format(len(items))) class Sharepoint(Source): @@ -64,3 +80,403 @@ def download_file( url=download_from_path, filename=download_to_path, ) + + + +class SharepointList(Source): + """ + A Sharepoint_List class to connect and download data from sharpoint lists. + + Args: + credentials (dict): Credentials should include: + - "tenant" + - "client_id" + - "scopes" + - "thumbprint" + - "private_key" + """ + + def __init__( + self, + credentials: Dict[str, Any] = None, + *args, + **kwargs, + ): + DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT_CERT") + credentials = credentials or DEFAULT_CREDENTIALS + if credentials is None: + raise CredentialError("Credentials not found.") + + super().__init__(*args, credentials=credentials, **kwargs) + + + def get_connection( + self, + site_url: str = None, + ): + + # Connecting into Sharepoint with AuthenticationContext + try: + auth_context = AuthenticationContext(site_url) + auth_context.with_client_certificate(tenant=self.credentials["TENANT"], + client_id=self.credentials["CLIENT_ID"], + scopes=[self.credentials["SCOPES"]], + thumbprint=self.credentials["THUMBPRINT"], + private_key=self.credentials["PRIVATE_KEY"]) + + self.ctx = ClientContext(site_url, auth_context) + logger.info("Successfully connect to Sharepoint Lists") + + except Exception as ex: + logger.info(f"Error at ClientContext or authentication level: {ex}") + + return self.ctx + + + # Function for extracting list items from search fields + def _unpack_fields( + self, + list_item, + selected_fields: dict = None, + ): + + # Creating the body of dictionary + new_dict = dict() + + # For loop scanning the propertys of searching fields + item_values_dict = list_item.properties + for field,val in item_values_dict.items(): + nested_dict = get_nested_dict(val) + # Check if the dictionary is nested + if nested_dict != None: + # It might be that there are different field properties than expected + nested_value = nested_dict.get(selected_fields["FieldProperty"]) + if nested_value != None: + new_dict[field] = nested_value + else: + logger.info("I'm not the right value") + raise ValueError + else: + new_dict[field] = val + + return new_dict + + + def get_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + ): + + ctx = self.get_connection(site_url = site_url) + + # Get list of lists object by List Title + self.list_object = ctx.web.lists.get_by_title(list_title) + list_fields_all = self.list_object.fields + + # Get all or specifics list of objects + if required_fields is None: + ctx.load(list_fields_all) + ctx.execute_query() + + return list_fields_all + + else: + list_fields_required = [list_fields_all.get_by_internal_name_or_title(field).get() + for field in required_fields] + ctx.execute_batch() + + return list_fields_required + + + def select_expandable_user_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + ): + """ + Method to expand fields and get more informations. + field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. + -> more properties can be discovered by getting list.item.properties. + Default to "Title" + """ + + list_fields = self.get_fields(list_title=list_title, + site_url=site_url, + required_fields=required_fields) + + # Finding the "selected" fields + fields_to_select = [field.properties['InternalName'] +f"/{field_property}" + if fnmatch(field.properties['TypeAsString'],"User*") + else field.properties['InternalName'] for field in list_fields] + # Finding the "expanded" fields + fields_to_expand = [field.properties['InternalName'] for field in list_fields + if fnmatch(field.properties['TypeAsString'],f"User*")] + + # Creating the body of the function output + selected_fields = {"FieldInternalNames" : fields_to_select, "FieldToExpand": fields_to_expand, "FieldProperty": field_property} + + return selected_fields + + + def check_filters( + self, + filters: dict = None, + ) -> bool: + """ + Function to check if filters dict is valid. + example1: if operator2 is present value2 must be in place as well + example2: if dtype is not on allowed list it will throw an error + """ + + allowed_dtypes = ['datetime','date','bool','int', 'float', 'complex', 'str'] + allowed_conjuction = ['&','|'] + allowed_operators = ['<','>','<=','>=','==','!='] + + for parameters in filters.values(): + if parameters.get('dtype') not in allowed_dtypes: + raise ValueError(f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}.") + if parameters.get('operator1'): + if parameters.get('operator1') not in allowed_operators: + raise ValueError(f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}.") + if not parameters.get('value1'): + raise ValueError('Value for operator1 is missing!') + elif not parameters.get('operator1') : raise ValueError('Operator1 is missing!') + if not parameters.get('operator2') and parameters.get('operators_conjuction') is not None: + raise ValueError(f"Operator conjuction allowed only with more than one filter operator!") + if parameters.get('operator2'): + if parameters.get('operator2') not in allowed_operators: + raise ValueError(f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}.") + if not parameters.get('value2'): + raise ValueError('Value for operator2 is missing!') + if not parameters.get('operators_conjuction'): + raise ValueError(f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty.") + if parameters.get('operators_conjuction') not in allowed_conjuction: + raise ValueError(f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}.") + if parameters.get('filters_conjuction'): + if len(filters.keys()) == 1 and parameters.get('filters_conjuction') is not None: + raise ValueError(f"Filters conjuction allowed only with more than one filter column!") + if parameters.get('filters_conjuction') not in allowed_conjuction: + raise ValueError(f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}.") + + return True + + + def operators_mapping( + self, + filters: dict = None, + ) -> dict: + """ + Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + New modified dict. + """ + + filters_dict = deepcopy(filters) + operators = { + '<' : 'lt', + '>' : 'gt', + '<=' : 'le', + '>=' : 'ge', + '==' : 'eq', + '!=' : 'ne' + } + logical_op = { + '&' : 'and', + '|' : 'or' + } + + for parameters in filters_dict.values(): + if parameters.get('operator1'): + operator1_to_change = parameters.get('operator1') + if operator1_to_change in operators.keys(): + parameters['operator1'] = operators[operator1_to_change] + else: raise ValueError(f'This comparison operator: {operator1_to_change} is not allowed. Please read the function documentation for details!') + if parameters.get('operator2'): + operator2_to_change = parameters.get('operator2') + if operator2_to_change in operators.keys(): + parameters['operator2'] = operators[operator2_to_change] + else: raise ValueError(f'This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!') + if parameters.get('operators_conjuction'): + logical_op_to_change = parameters.get('operators_conjuction') + if logical_op_to_change in logical_op.keys(): + parameters['operators_conjuction'] = logical_op[logical_op_to_change] + else: raise ValueError(f'This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!') + if parameters.get('filters_conjuction'): + logical_fl_to_change = parameters.get('filters_conjuction') + if logical_fl_to_change in logical_op.keys(): + parameters['filters_conjuction'] = logical_op[logical_fl_to_change] + else: raise ValueError(f'This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!') + + return filters_dict + + + def make_filter_for_api( + self, + filters: dict + ) -> 'str': + """ + Function changing type of operators to match MS API style as 'str' passing to URL call. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter parameter to API. + """ + + filter_text = '' + filters_mod = self.operators_mapping(filters) + + for column, parameters in filters_mod.items(): + if parameters.get('dtype') in ['datetime','date']: + from_date1 = datetime.strptime(parameters.get('value1'),'%Y-%m-%d').isoformat() + filter_text = filter_text + f"{column} {parameters.get('operator1')} datetime'{from_date1}' " + if parameters.get('operator2'): + from_date2 = datetime.strptime(parameters.get('value2'),'%Y-%m-%d').isoformat() + filter_text = filter_text + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " + elif parameters.get('dtype') not in ['datetime','date']: + filter_text = filter_text + f"{column} {parameters.get('operator1')} '{parameters.get('value1')}'" + if parameters.get('operator2'): + filter_text = filter_text + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" + if parameters.get('filters_conjuction'): + filter_text = filter_text + f"{parameters.get('filters_conjuction')} " + + return filter_text + + + def make_filter_for_df( + self, + filters: dict = None, + ) -> 'str': + """ + Function changing dict operators into pandas DataFrame filters. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter to DataFrame. + """ + + filter_in_df = 'df.loc[' + + for column, parameters in filters.items(): + filter_in_df = filter_in_df + f"(df.{column} {parameters.get('operator1', '')} '{parameters.get('value1', '')}'" + + if parameters.get('operator2'): + filter_in_df = filter_in_df + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + + if parameters.get('filters_conjuction'): + filter_in_df = filter_in_df + ')' + parameters.get('filters_conjuction') + + else: + filter_in_df = filter_in_df + ')' + + filter_in_df = filter_in_df + ']' + + return filter_in_df + + + def list_item_to_df( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + ): + """ + Method to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pd.DataFrame + """ + + # checking if the passed filters dictionary is correctly built + if filters is not None: + self.check_filters(filters) + # checking that the filter parameters are included in the desired field parameters + for key in filters: + if key not in required_fields: + raise AttributeError(f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!") + + # changing the body of the filter for MS API call + filter_text = self.make_filter_for_api(filters) + + download_all = False + + # extracting requeird_fields SP_List objects + selected_fields = self.select_expandable_user_fields(list_title=list_title, + site_url=site_url, + required_fields=required_fields, + field_property=field_property) + + try: + # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. + if filters is None: + raise ValueError("There is no filter. Starting extraxction all data") + else: + list_items= self.list_object.items.filter(filter_text).select(selected_fields["FieldInternalNames"]).top(row_count).expand(selected_fields["FieldToExpand"]) + self.ctx.load(list_items) + self.ctx.execute_query() + + except (ClientRequestException, ValueError) as e: + # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFreame level. + logger.info(f"Exception SPQueryThrottledException occurred: {e}") + list_items = self.list_object.items.get_all(row_count,log_of_progress).select(selected_fields["FieldInternalNames"]).expand(selected_fields["FieldToExpand"]) + self.ctx.load(list_items) + self.ctx.execute_query() + download_all = True + + df = pd.DataFrame([self._unpack_fields(row_item,selected_fields) for row_item in list_items]) + + if download_all == True and filters is not None: + # Filter for desired range of created date and for factory Namyslow PL + self.logger.info("Filtering df with all data output") + filter_for_df = self.make_filter_for_df(filters) + df = eval(filter_for_df) + + return df \ No newline at end of file diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 02791852b..1e67d96a6 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .outlook import OutlookToDF from .prefect_date_range import GetFlowNewDateRange from .salesforce import SalesforceBulkUpsert, SalesforceToDF, SalesforceUpsert -from .sharepoint import SharepointToDF +from .sharepoint import SharepointToDF, SharepointListToDF from .sqlite import SQLiteInsert, SQLiteQuery, SQLiteSQLtoDF from .supermetrics import SupermetricsToCSV, SupermetricsToDF diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index 7ba9c4d41..cb7f30a19 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -1,16 +1,17 @@ +from typing import List +import pandas as pd import copy import json import os -from typing import List +import re -import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs from ..exceptions import ValidationError -from ..sources import Sharepoint +from ..sources import Sharepoint, SharepointList from .azure_key_vault import AzureKeyVaultSecret from ..utils import add_viadot_metadata_columns @@ -230,3 +231,171 @@ def run( df = self.df_replace_special_chars(df) self.logger.info(f"Successfully converted data to a DataFrame.") return df + + + +class SharepointListToDF(Task): + """ + Task to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pandas DataFrame + """ + + def __init__( + self, + path: str = None, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.row_count = row_count + self.vault_name = vault_name + self.credentials_secret = credentials_secret + + if not credentials_secret: + # Attempt to read a default for the service principal secret name + try: + credentials_secret = PrefectSecret("SHAREPOINT-CERT").run() + except ValueError: + pass + + if credentials_secret: + credentials_str = AzureKeyVaultSecret( + secret = self.credentials_secret, vault_name=self.vault_name + ).run() + self.credentials = json.loads(credentials_str) + + + super().__init__( + *args, + **kwargs, + ) + + + def __call__(self): + """Download Sharepoint_List data to a .parquet file""" + super().__call__(self) + + + def _convert_camel_case_to_words( + self, + input_str: str) -> str: + + self.input_str = input_str + + words = re.findall(r'[A-Z][a-z]*|[0-9]+', self.input_str) + converted = ' '.join(words) + + return converted + + + def change_column_name( + self, + df: pd.DataFrame = None, + ): + s = SharepointList() + list_fields = s.get_fields( + list_title= self.list_title, + site_url= self.site_url, + required_fields= self.required_fields, + ) + + self.logger.info("Changing columns names") + column_names_correct = [field.properties['Title'] for field in list_fields] + column_names_code = [field.properties['InternalName'] for field in list_fields] + dictionary = dict(zip(column_names_code, column_names_correct)) + + # If duplicates in names from "Title" take "InternalName" + value_count = {} + duplicates = [] + + for key, value in dictionary.items(): + if value in value_count: + if value_count[value] not in duplicates: + duplicates.append(value_count[value]) + duplicates.append(key) + else: + value_count[value] = key + + for key in duplicates: + dictionary[key] = self._convert_camel_case_to_words(key) + + # Rename columns names inside DataFrame + df = df.rename(columns=dictionary) + + return df + + + def run( + self, + ) -> None: + """ + Run Task SharepointListToDF. + + Returns: + pd.DataFrame + """ + + s = SharepointList(credentials=self.credentials,) + df_raw = s.list_item_to_df( + list_title = self.list_title, + site_url = self.site_url, + required_fields = self.required_fields, + field_property = self.field_property, + filters = self.filters, + row_count = self.row_count, + ) + + df = self.change_column_name(df=df_raw) + self.logger.info("Successfully changed structure of the DataFrame") + + return df \ No newline at end of file diff --git a/viadot/utils.py b/viadot/utils.py index 2b4c80538..49fa33e09 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -449,3 +449,14 @@ def wrapper(*args, **kwargs) -> pd.DataFrame: return wrapper return decorator + + +def get_nested_dict(d): + if isinstance(d, dict): + for lvl in d.values(): + if isinstance(lvl, dict): + return get_nested_dict(lvl) + else: + return d + else: + return None \ No newline at end of file