From 3991b5f0ef4854b2f730a5ea8ae63ec4e540a301 Mon Sep 17 00:00:00 2001 From: marcinpurtak Date: Fri, 10 Nov 2023 16:09:29 +0100 Subject: [PATCH] Sharepoint list connector extension for multichoice fields with some small fixes and docstring update --- CHANGELOG.md | 19 ++ tests/integration/test_sharepoint.py | 138 +++++++++++--- viadot/flows/sharepoint_to_adls.py | 80 ++++---- viadot/sources/sharepoint.py | 263 ++++++++++++++++++++------- viadot/tasks/sharepoint.py | 100 ++++++++-- 5 files changed, 457 insertions(+), 143 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 507c590cf..208908bb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Modified `SharepointList` source class: + -> docstrings update +- Modified `SharepointToADLS` flow class: + -> docstrings update + -> changed key_value_param: bool = False to prevent forced KV store append +- Modified `SharepointListToADLS` flow class: + -> changed key_value_param: bool = False to prevent forced KV store append +- Modified `SharepointList` source class: + -> docstrings update + -> Changed `_unpack_fields` method to handle Sharepoint MultiChoiceField type + small improvements + -> Changed `get_fields` method to handle special characters - different approach to call get() and execute_query() + -> Renamed method from `select_expandable_user_fields` to `select_fields` + update for MultiChoiceField type + -> Changed `check_filters` method errors messages and more checks added + -> Changed `operators_mapping` method errors messages + -> Changed `make_filter_for_df` method errors messages +- Modified `SharepointListToDF` task class: + -> docstrings update + -> Added `_rename_duplicated_fields` method to find and rename duplicated columns + ## [0.4.21] - 2023-10-26 ### Added diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index 502ffded0..38fdfa8a7 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -2,7 +2,6 @@ import re import pandas as pd -from copy import deepcopy import pytest from prefect.tasks.secrets import PrefectSecret @@ -10,7 +9,7 @@ from viadot.exceptions import CredentialError from viadot.sources import Sharepoint from viadot.task_utils import df_get_data_types_task -from viadot.tasks.sharepoint import SharepointToDF +from viadot.tasks.sharepoint import SharepointToDF, SharepointListToDF from viadot.sources import SharepointList @@ -168,10 +167,11 @@ def test_get_data_types(file_name): assert "String" in dtypes +### SECTION FOR TESTING SHAREPOINT LIST CONNECTOR ### @pytest.fixture(scope="session") def sharepoint_list(): """ - Fixture for creating a Sharepoint class instance. + Fixture for creating a SharepointList class instance. The class instance can be used within a test functions to interact with Sharepoint. """ spl = SharepointList() @@ -187,15 +187,31 @@ def test_valid_filters(sharepoint_list): assert result is True -def test_invalid_dtype(sharepoint_list): +def test_filters_missing_dtype(sharepoint_list): + filters = { + "filter1": {"operator1": ">", "value1": 10}, + } + with pytest.raises( + ValueError, + match=re.escape("dtype for filter1 is missing!"), + ): + sharepoint_list.check_filters(filters) + + +def test_filters_invalid_dtype(sharepoint_list): filters = { "filter1": {"dtype": "list", "operator1": ">", "value1": 10}, } - with pytest.raises(ValueError, match="dtype not allowed!"): + with pytest.raises( + ValueError, + match=re.escape( + "dtype not allowed! Expected: ['datetime', 'date', 'bool', 'int', 'float', 'complex', 'str'] got: list ." + ), + ): sharepoint_list.check_filters(filters) -def test_missing_operator1(sharepoint_list): +def test_filters_missing_operator1(sharepoint_list): filters = { "filter1": {"dtype": "int", "value1": 10}, } @@ -203,23 +219,28 @@ def test_missing_operator1(sharepoint_list): sharepoint_list.check_filters(filters) -def test_invalid_operator1(sharepoint_list): +def test_filters_invalid_operator1(sharepoint_list): filters = { "filter1": {"dtype": "int", "operator1": "*", "value1": 10}, } - with pytest.raises(ValueError, match="Operator type not allowed!"): + with pytest.raises( + ValueError, + match=re.escape( + "Operator1 type not allowed! Expected: ['<', '>', '<=', '>=', '==', '!='] got: * ." + ), + ): sharepoint_list.check_filters(filters) -def test_missing_value1(sharepoint_list): +def test_filters_missing_value1(sharepoint_list): filters = { "filter1": {"dtype": "int", "operator1": ">", "value1": None}, } - with pytest.raises(ValueError, match="Value for operator1 is missing!"): + with pytest.raises(ValueError, match="Value1 for operator1 is missing!"): sharepoint_list.check_filters(filters) -def test_missing_operators_conjuction(sharepoint_list): +def test_filters_missing_operators_conjuction(sharepoint_list): filters = { "filter1": { "dtype": "int", @@ -229,11 +250,16 @@ def test_missing_operators_conjuction(sharepoint_list): "value2": 20, }, } - with pytest.raises(ValueError, match="Operators for conjuction is missing!"): + with pytest.raises( + ValueError, + match=re.escape( + "Operator for conjuction is missing! Expected: ['&', '|'] got empty." + ), + ): sharepoint_list.check_filters(filters) -def test_invalid_operators_conjuction(sharepoint_list): +def test_filters_invalid_operators_conjuction(sharepoint_list): filters = { "filter1": { "dtype": "int", @@ -244,11 +270,16 @@ def test_invalid_operators_conjuction(sharepoint_list): "operators_conjuction": "!", }, } - with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): + with pytest.raises( + ValueError, + match=re.escape( + "Operator for conjuction not allowed! Expected: ['&', '|'] got ! ." + ), + ): sharepoint_list.check_filters(filters) -def test_invalid_filters_conjuction(sharepoint_list): +def test_filters_conjuction_not_allowed(sharepoint_list): filters = { "filter1": { "dtype": "int", @@ -258,7 +289,32 @@ def test_invalid_filters_conjuction(sharepoint_list): }, } with pytest.raises( - ValueError, match="Filters operators for conjuction not allowed!" + ValueError, + match=re.escape( + "Filters conjuction allowed only when more then one filter provided!" + ), + ): + sharepoint_list.check_filters(filters) + + +def test_filters_invalid_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "value1": 10, + "operator1": ">", + "filters_conjuction": "!", + }, + "filter2": { + "dtype": "int", + "operator1": "==", + }, + } + with pytest.raises( + ValueError, + match=re.escape( + "Filter operator for conjuction not allowed! Expected: ['&', '|'] got ! ." + ), ): sharepoint_list.check_filters(filters) @@ -266,27 +322,47 @@ def test_invalid_filters_conjuction(sharepoint_list): def test_valid_mapping(sharepoint_list): filters = { "filter1": { + "dtype": "int", + "value1": 10, + "value2": 20, "operator1": ">", "operator2": "<=", "operators_conjuction": "&", "filters_conjuction": "|", }, - "filter2": {"operator1": "==", "operator2": "!=", "operators_conjuction": "|"}, + "filter2": { + "dtype": "int", + "value1": 30, + "value2": 0, + "operator1": "==", + "operator2": "!=", + "operators_conjuction": "|", + }, } expected_result = { "filter1": { + "dtype": "int", + "value1": 10, + "value2": 20, "operator1": "gt", "operator2": "le", "operators_conjuction": "and", "filters_conjuction": "or", }, - "filter2": {"operator1": "eq", "operator2": "ne", "operators_conjuction": "or"}, + "filter2": { + "dtype": "int", + "value1": 30, + "value2": 0, + "operator1": "eq", + "operator2": "ne", + "operators_conjuction": "or", + }, } - result = sharepoint_list.operators_mapping(deepcopy(filters)) + result = sharepoint_list.operators_mapping(filters) assert result == expected_result -def test_invalid_comparison_operator(sharepoint_list): +def test_operators_mapping_invalid_comparison_operator(sharepoint_list): filters = { "filter1": { "operator1": "*", @@ -297,10 +373,10 @@ def test_invalid_comparison_operator(sharepoint_list): } error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" with pytest.raises(ValueError, match=re.escape(error_message)): - sharepoint_list.operators_mapping(deepcopy(filters)) + sharepoint_list.operators_mapping(filters) -def test_invalid_logical_operator(sharepoint_list): +def test_operators_mapping_invalid_logical_operator(sharepoint_list): filters = { "filter1": { "operator1": ">", @@ -309,9 +385,23 @@ def test_invalid_logical_operator(sharepoint_list): "filters_conjuction": "|", }, } - error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" + error_message = "This conjuction (logical) operator: ! is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(filters) + + +def test_operators_mapping_invalid_filters_logical_operator(sharepoint_list): + filters = { + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "!", + }, + } + error_message = "This filters conjuction (logical) operator: ! is not allowed. Please read the function documentation for details!" with pytest.raises(ValueError, match=re.escape(error_message)): - sharepoint_list.operators_mapping(deepcopy(filters)) + sharepoint_list.operators_mapping(filters) def test_single_filter_datetime_api(sharepoint_list): diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index 410538e7b..6191317d0 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -42,6 +42,7 @@ def __init__( if_exists: str = "replace", validate_df_dict: dict = None, timeout: int = 3600, + key_value_param: bool = False, *args: List[any], **kwargs: Dict[str, Any], ): @@ -69,6 +70,7 @@ def __init__( dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. + key_value_param (bool, optional): Wheter to do key-value parameters in KV Store or not. Defaults to False. """ # SharepointToDF self.if_empty = if_empty @@ -86,6 +88,7 @@ def __init__( self.adls_sp_credentials_secret = adls_sp_credentials_secret self.if_exists = if_exists self.output_file_extension = output_file_extension + self.key_value_param = key_value_param self.now = str(pendulum.now("utc")) if self.local_dir_path is not None: self.local_file_path = ( @@ -177,7 +180,8 @@ def gen_flow(self) -> Flow: file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.key_value_param == True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) @staticmethod def slugify(name): @@ -188,42 +192,35 @@ class SharepointListToADLS(Flow): def __init__( self, name: str, - list_title: str = None, - site_url: str = None, + list_title: str, + site_url: str, + path: str, + adls_dir_path: str, + adls_file_name: str, + filters: dict = None, required_fields: List[str] = None, field_property: str = "Title", - filters: dict = None, row_count: int = 5000, + adls_sp_credentials_secret: str = None, sp_cert_credentials_secret: str = None, vault_name: str = None, - path: str = None, - adls_dir_path: str = None, - adls_file_name: str = None, - adls_sp_credentials_secret: str = None, overwrite_adls: bool = True, output_file_extension: str = ".parquet", validate_df_dict: dict = None, + key_value_param: bool = False, *args: List[any], **kwargs: Dict[str, Any], ): - """ - Run Flow SharepointListToADLS. + """_summary_ Args: - name (str): Prefect flow name. - list_title (str): Title of Sharepoint List. Default to None. - site_url (str): URL to set of Sharepoint Lists. Default to None. - required_fields (List[str]): Required fields(columns) need to be extracted from - Sharepoint List. Default to None. - field_property (List[str]): Property to expand fields with expand query method. - For example: User fields could be expanded and "Title" - or "ID" could be extracted - -> usefull to get user name instead of ID - All properties can be found under list.item.properties. - WARNING! Field types and properties might change which could - lead to errors - extension of sp connector would be required. - Default to ["Title"] - filters (dict): Dictionary with operators which filters the SharepointList output. + name (str): Prefect flow name. + list_title (str): Title of Sharepoint List. + site_url (str): URL to set of Sharepoint Lists. + path (str): Local file path. Default to None. + adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_file_name (str): Name of file in ADLS. Defaults to None. + filters (dict, optional): Dictionary with operators which filters the SharepointList output. allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') allowed conjuction: ('&','|') allowed operators: ('<','>','<=','>=','==','!=') @@ -247,16 +244,27 @@ def __init__( 'operator1':'==', }, } - row_count (int): Number of downloaded rows in single request. Default to 5000. - sp_cert_credentials_secret (str): Credentials to verify Sharepoint connection. Default to None. - vault_name (str): KeyVaultSecret name. Default to None. - path (str): Local file path. Default to None. - adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. - adls_file_name (str, optional): Name of file in ADLS. Defaults to None. - adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with - ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, - CLIENT_SECRET) for the Azure Data Lake. Defaults to None. - overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + Defaults to None. + required_fields (List[str], optional): Required fields(columns) need to be extracted from + Sharepoint List. Defaults to None. + field_property (str, optional): Property to expand fields with expand query method. + For example: User fields could be expanded and "Title" + or "ID" could be extracted + -> usefull to get user name instead of ID + All properties can be found under list.item.properties. + WARNING! Field types and properties might change which could + lead to errors - extension of sp connector would be required. + Default to ["Title"]. Defaults to "Title". + row_count (int, optional): Number of downloaded rows in single request.Defaults to 5000. + adls_sp_credentials_secret (str, optional): Credentials to connect to Azure ADLS + If not passed it will take cred's from your .config/credentials.json Defaults to None. + sp_cert_credentials_secret (str, optional): Credentials to verify Sharepoint connection. + If not passed it will take cred's from your .config/credentials.json Default to None. + vault_name (str, optional): KeyVaultSecret name. Default to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + output_file_extension (str, optional): _description_. Defaults to ".parquet". + validate_df_dict (dict, optional): Wheter to do an extra df validation before ADLS upload or not to do. Defaults to None. + key_value_param (bool, optional): Wheter to do key-value parameters in KV Store or not. Defaults to False. Returns: .parquet file inside ADLS. @@ -280,6 +288,7 @@ def __init__( self.overwrite = overwrite_adls self.adls_sp_credentials_secret = adls_sp_credentials_secret self.output_file_extension = output_file_extension + self.key_value_param = key_value_param self.now = str(pendulum.now("utc")) if self.path is not None: self.local_file_path = ( @@ -370,7 +379,8 @@ def gen_flow(self) -> Flow: file_to_adls_task.set_upstream(df_to_file, flow=self) json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.key_value_param == True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) @staticmethod def slugify(name): diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 7f57bd658..7f1bc523c 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -85,8 +85,10 @@ def download_file( class SharepointList(Source): """ - A Sharepoint_List class to connect and download data from sharpoint lists. - + A Sharepoint_List class to connect and download data from Sharepoint lists. + Warning! + Please be carefull with selection of the column names because once sharepoint list is opened inside a browser it may display columns in different languages. + Because of that the resulting file or output might have different column names then the one which u see in the browser. Args: credentials (dict): Credentials should include: - "tenant" @@ -102,6 +104,20 @@ def __init__( *args, **kwargs, ): + """_summary_ + + Args: + credentials (Dict[str, Any], optional): Credentials should include: + - "tenant" + - "client_id" + - "scopes" + - "thumbprint" + - "private_key" + + Raises: + CredentialError: If no credentials are pased + and local config doesn't contain them neiter + """ DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT_CERT") credentials = credentials or DEFAULT_CREDENTIALS if credentials is None: @@ -109,11 +125,16 @@ def __init__( super().__init__(*args, credentials=credentials, **kwargs) - def get_connection( - self, - site_url: str = None, - ): - # Connecting into Sharepoint with AuthenticationContext + def get_connection(self, site_url: str): + """Function for connecting into Sharepoint with AuthenticationContext + + Args: + site_url (str): url of the sharepoint list + + Returns: + ctx: authentication context + """ + logger.info("Connecting into Sharepoint with AuthenticationContexts") try: auth_context = AuthenticationContext(site_url) auth_context.with_client_certificate( @@ -132,51 +153,80 @@ def get_connection( return self.ctx - # Function for extracting list items from search fields def _unpack_fields( self, list_item, - selected_fields: dict = None, + selected_fields: dict, ) -> dict: + """Function for extracting and unpacking list items from the search fields + + Args: + list_items (office365 list item): A list with office365 list item objects (rows) + selected_fields (dict): A dict with fields selected for ingestion, generated by SharepointList.select_fields() + + Raises: + ValueError: "Check if given field property is valid!" + ValueError: "Get nested dict for not recognized type of field! Check field types in the source" + ValueError: "Get empty properties for list items" + + Returns: + dict: A dictionary with Column: Value pairs for each row from the list + """ # Creating the body of dictionary new_dict = dict() # For loop scanning the propertys of searching fields item_values_dict = list_item.properties - for field, val in item_values_dict.items(): - nested_dict = get_nested_dict(val) - # Check if field has expandable type - if field in selected_fields["FieldToExpand"]: + if item_values_dict: + for field, val in item_values_dict.items(): + nested_dict = get_nested_dict(val) # Check if the values are nested if nested_dict != None: - # It might be that there are different field properties than expected - nested_value = nested_dict.get( - selected_fields["FieldExpandProperty"] - ) - if nested_value != None: - new_dict[field] = nested_value + # Check if field has expandable type + if field in selected_fields["FieldToExpand"]: + # It might be that there are different field properties than expected + nested_value = nested_dict.get( + selected_fields["FieldExpandProperty"] + ) + if nested_value != None: + new_dict[field] = nested_value + else: + raise ValueError("Check if given field property is valid!") + elif field in selected_fields["MultiChoiceField"]: + # Field type of multi choice could have more than 1 selection. + new_dict[field] = ";".join(nested_dict.values()) else: - logger.info("Property of the extandable field not recognized!") - raise ValueError("Check if given field property is valid!") - elif field in selected_fields["MultiChoiceField"]: - # Field type of multi choice could have more than 1 selection. - new_dict[field] = ";".join(nested_dict.values()) + raise ValueError( + "Get nested dict for not recognized type of field! Check field types in the source" + ) else: - raise ValueError( - "Get nested dict for not recognized type of field! Check field types in the source" - ) - else: - new_dict[field] = val - + new_dict[field] = val + else: + raise ValueError( + "Get empty properties for list items. Check if parameter list_item collection containes any data -> item objects." + ) return new_dict def get_fields( self, - list_title: str = None, - site_url: str = None, + list_title: str, + site_url: str, required_fields: List[str] = None, - ): - ctx = self.get_connection(site_url=site_url) + ) -> List: + """ + Function for geting list of fields objects from the sharepoint list. + It can get all fields available if required_fields not passed + or just the one which are in the list required_fields. + + Args: + list_title (str): name of the sharepoint list + site_url (str): url to the sharepoint list with "/" at the end + required_fields (List[str], optional ): List of required fields to ingest. It will get all fields if not passed. + + Returns: + List: list with office365 sharepoint list field objects + """ + ctx = self.get_connection(site_url=site_url) # Get list of lists object by List Title self.list_object = ctx.web.lists.get_by_title(list_title) list_fields_all = self.list_object.fields @@ -200,18 +250,32 @@ def get_fields( def select_fields( self, - list_title: str = None, - site_url: str = None, + list_title: str, + site_url: str, required_fields: List[str] = None, field_property: str = "Title", ) -> dict: """ Method to create a data structure for handling info about - selection of fields with details about possible expansion for more data or details. + selection of fields with details about possible expansion for more data or details. Field types to extract more values can be: "User*", "MultiChoice" field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. -> more properties can be discovered by getting list.item.properties. - Default to "Title" + + Args: + list_title (str): _description_. Defaults to None. + site_url (str): _description_. Defaults to None. + required_fields (List[str], optional): _description_. Defaults to None. + field_property (str, optional): Property to extract from nested fields + like column with type User*. Defaults to "Title". + + Returns: + dict: selected_fields = { + "FieldInternalNames": List of fields to select with its InternalNames (from api), + "FieldToExpand": fields_to_expand,-> fields which could be expanded to get more data from API + "FieldExpandProperty": field_property, property of the expandable field which will be extracted + "MultiChoiceField": List of fields which can have multiple values in 1 row + } """ list_fields = self.get_fields( @@ -248,30 +312,73 @@ def select_fields( def check_filters( self, - filters: dict = None, + filters: dict, ) -> bool: """ Function to check if filters dict is valid. - example1: if operator2 is present value2 must be in place as well - example2: if dtype is not on allowed list it will throw an error + Please check and apply only allowed filter settings: + allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] + allowed_conjuction = ["&", "|"] + allowed_operators = ["<", ">", "<=", ">=", "==", "!="] + Operator conjuction is only possible if there are 2 values like: value <= 1 | value == 5 + Filter conjuction is only possible if there are more then 1 filters for ex. date and creator + + Args: + filters (dict): A dictionary containing filter settings + Example: + filters = { + "Created": { + "dtype": "datetime", + "value1": yesterday_date, + "value2": today_date, + "operator1": ">=", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "&", + }, + "Factory": { + "dtype": "str", + "value1": "NM-PL", + "operator1": "==", + }, + } + + Raises: + ValueError: If dtype not in allowed list + ValueError: If comparison operator1 not in allowed list + ValueError: If value for operator1 is missing + ValueError: If comparison operator1 for the first value is missing + ValueError: If comparison operator2 not in allowed list + ValueError: If value for operator2 is missing + ValueError: If comparison operator2 for the first value is missing + ValueError: If operator conjuction is missing while there are 2 values and 2 operators passed + ValueError: If operator conjuction is not in the allowed list + ValueError: If operator conjuction provided why only one filter value is given + ValueError: If filter conjuction provided without 2nd filter + ValueError: If filter conjuction not in the allowed list + + Returns: + bool: True if all checks passed """ allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] allowed_conjuction = ["&", "|"] allowed_operators = ["<", ">", "<=", ">=", "==", "!="] - for parameters in filters.values(): + for filter_name, parameters in filters.items(): + if not parameters.get("dtype"): + raise ValueError(f"dtype for {filter_name} is missing!") if parameters.get("dtype") not in allowed_dtypes: raise ValueError( - f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}." + f"dtype not allowed! Expected: {allowed_dtypes} got: {parameters.get('dtype')} ." ) if parameters.get("operator1"): if parameters.get("operator1") not in allowed_operators: raise ValueError( - f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}." + f"Operator1 type not allowed! Expected: {allowed_operators} got: {parameters.get('operator1')} ." ) if not parameters.get("value1"): - raise ValueError("Value for operator1 is missing!") + raise ValueError("Value1 for operator1 is missing!") elif not parameters.get("operator1"): raise ValueError("Operator1 is missing!") if ( @@ -279,22 +386,22 @@ def check_filters( and parameters.get("operators_conjuction") is not None ): raise ValueError( - f"Operator conjuction allowed only with more than one filter operator!" + f"Operator conjuction allowed only with more then one filter operator!" ) if parameters.get("operator2"): if parameters.get("operator2") not in allowed_operators: raise ValueError( - f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}." + f"Operator2 type not allowed! Expected: {allowed_operators} got: {parameters.get('operator2')} ." ) if not parameters.get("value2"): - raise ValueError("Value for operator2 is missing!") + raise ValueError("Value2 for operator2 is missing!") if not parameters.get("operators_conjuction"): raise ValueError( - f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty." + f"Operator for conjuction is missing! Expected: {allowed_conjuction} got empty." ) if parameters.get("operators_conjuction") not in allowed_conjuction: raise ValueError( - f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}." + f"Operator for conjuction not allowed! Expected: {allowed_conjuction} got {parameters.get('operators_conjuction')} ." ) if parameters.get("filters_conjuction"): if ( @@ -302,27 +409,42 @@ def check_filters( and parameters.get("filters_conjuction") is not None ): raise ValueError( - f"Filters conjuction allowed only with more than one filter column!" + f"Filters conjuction allowed only when more then one filter provided!" ) if parameters.get("filters_conjuction") not in allowed_conjuction: raise ValueError( - f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}." + f"Filter operator for conjuction not allowed! Expected: {allowed_conjuction} got {parameters.get('filters_conjuction')} ." ) return True def operators_mapping( self, - filters: dict = None, + filters: dict, ) -> dict: """ Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. + Allowed operators: + < + > + <= + >= + == + != + "&" + "|" Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionary which contains operators. + + Raises: + ValueError: If operator1 not allowed + ValueError: If operator2 not allowed + ValueError: If operators conjuction not allowed + ValueError: If filters conjuction not allowed Returns: - New modified dict. + dict: New modified dict with mapped operators. """ filters_dict = deepcopy(filters) @@ -361,7 +483,7 @@ def operators_mapping( ] else: raise ValueError( - f"This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" + f"This conjuction (logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" ) if parameters.get("filters_conjuction"): logical_fl_to_change = parameters.get("filters_conjuction") @@ -369,12 +491,12 @@ def operators_mapping( parameters["filters_conjuction"] = logical_op[logical_fl_to_change] else: raise ValueError( - f"This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" + f"This filters conjuction (logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" ) return filters_dict - def make_filter_for_api(self, filters: dict) -> "str": + def make_filter_for_api(self, filters: dict) -> str: """ Function changing type of operators to match MS API style as 'str' passing to URL call. @@ -382,7 +504,7 @@ def make_filter_for_api(self, filters: dict) -> "str": filters (dict): A dictionar which contains operators. Returns: - Output as string to pass as filter parameter to API. + str: Output as filtering string to pass as filter parameter to API. """ filter_text = "" @@ -422,16 +544,16 @@ def make_filter_for_api(self, filters: dict) -> "str": def make_filter_for_df( self, - filters: dict = None, - ) -> "str": + filters: dict, + ) -> str: """ - Function changing dict operators into pandas DataFrame filters. + Function changing filters into pandas DataFrame filtering string used later for filtering the DF. Args: - filters (dict): A dictionar which contains operators. + filters (dict): A dictionary which contains operators. Returns: - Output as string to pass as filter to DataFrame. + str: Output as string to pass as filter to DataFrame. """ filter_in_df = "df.loc[" @@ -469,6 +591,9 @@ def list_item_to_df( ): """ Method to extract data from Sharepoint List into DataFrame. + If filters are passed, function will try to extract only filtered data to reduce the amount of data to transfer. + If there is no filter or there is an throttling (max rows returned limit reached) + exception ,then 2nd workflow will start and download all data which will be filtered later in the data frame. Args: list_title (str): Title of Sharepoint List. Default to None. @@ -504,6 +629,10 @@ def list_item_to_df( } row_count (int): Number of downloaded rows in single request. Default to 5000. + Raises: + AttributeError: If filter column not included inside required fields list. + ValueError: If there is no filter passed - > will extract all fields and filter later. + Returns: pd.DataFrame """ @@ -515,7 +644,7 @@ def list_item_to_df( for key in filters: if key not in required_fields: raise AttributeError( - f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!" + f"Filter '{key}' column not included inside required fields. It is obligatory to extract data which is filtered!" ) # changing the body of the filter for MS API call @@ -523,7 +652,7 @@ def list_item_to_df( download_all = False - # extracting requeird_fields SP_List objects + # extracting required_fields SP_List objects selected_fields = self.select_fields( list_title=list_title, site_url=site_url, @@ -534,7 +663,7 @@ def list_item_to_df( try: # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. if filters is None: - raise ValueError("There is no filter. Starting extraxction all data") + raise ValueError("There is no filter. Switching to extract all fields.") else: list_items = ( self.list_object.items.filter(filter_text) diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index 2a1cb0bc4..635f9a5ae 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -245,7 +245,7 @@ class SharepointListToDF(Task): field_property (List[str]): Property to expand with expand query method. All propertys can be found under list.item.properties. Default to ["Title"] - filters (dict): Dictionary with operators which filters the SharepointList output. + filters (dict, optional): Dictionary with operators which filters the SharepointList output. allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') allowed conjuction: ('&','|') allowed operators: ('<','>','<=','>=','==','!=') @@ -277,9 +277,9 @@ class SharepointListToDF(Task): def __init__( self, - path: str = None, - list_title: str = None, - site_url: str = None, + path: str, + list_title: str, + site_url: str, required_fields: List[str] = None, field_property: str = "Title", filters: dict = None, @@ -289,7 +289,6 @@ def __init__( *args, **kwargs, ): - self.path = path self.list_title = list_title self.site_url = site_url @@ -300,6 +299,11 @@ def __init__( self.vault_name = vault_name self.credentials_secret = credentials_secret + super().__init__( + *args, + **kwargs, + ) + if not credentials_secret: # Attempt to read a default for the service principal secret name try: @@ -313,16 +317,65 @@ def __init__( ).run() self.credentials = json.loads(credentials_str) - super().__init__( - *args, - **kwargs, - ) - def __call__(self): """Download Sharepoint_List data to a .parquet file""" super().__call__(self) + def _rename_duplicated_fields(self, df): + """ + Renames duplicated columns in a DataFrame by appending a numerical suffix. + Function to check if there are fields with + the same name but in different style (lower, upper) + It might happen that fields returned by get_fields() will be different + than actual list items fields ( from it's properties) + It is specific to sharepoint lists. + MS allowed users to create fields with simillar names (but with different letters style) + fields with same values. For example Id and ID - > office select function doesn't + recognize upper/lower cases. + + Args: + df (pd.DataFrame): The input DataFrame with potentially duplicated columns. + required_fields (list): List of fields that should not be considered for renaming. + + Returns: + pd.DataFrame: DataFrame with duplicated columns renamed to ensure uniqueness. + + Example: + Given DataFrame df: + ``` + A B C B D + 0 1 2 3 4 5 + ``` + + Required fields = ['A', 'B'] + After calling _rename_duplicated_fields(df, required_fields): + ``` + A B C B2 D + 0 1 2 3 4 5 + ``` + """ + col_to_compare = df.columns.tolist() + i = 1 + for column in df.columns.tolist(): + if not column in self.required_fields: + col_to_compare.remove(column) + if column.lower() in [to_cmp.lower() for to_cmp in col_to_compare]: + i += 1 + logger.info(f"Found duplicated column: {column} !") + logger.info(f"Renaming from {column} to {column}{i}") + df = df.rename(columns={f"{column}": f"{column}{i}"}) + return df + def _convert_camel_case_to_words(self, input_str: str) -> str: + """ + Function for converting internal names joined as camelCase column names to regular words + + Args: + input_str (str): Column name + + Returns: + str: Converted column name + """ self.input_str = input_str @@ -331,11 +384,23 @@ def _convert_camel_case_to_words(self, input_str: str) -> str: return converted - def change_column_name( - self, - df: pd.DataFrame = None, - ): - s = SharepointList() + def change_column_name(self, df: pd.DataFrame, credentials: str = None): + """ + Function for changing coded internal column names (Unicode style) to human readable names. + !Warning! + Names are taken from field properties Title! + Because of that the resulting column name might have different then initial name. + + Args: + df (pd.DataFrame): A data frame with loaded column names from sharepoint list. + credentials (str): Credentials str for sharepoint connection establishing. Defaults to None. + + Returns: + pd.DataFrame: Data frame with changed column names + """ + s = SharepointList( + credentials=self.credentials, + ) list_fields = s.get_fields( list_title=self.list_title, site_url=self.site_url, @@ -364,7 +429,7 @@ def change_column_name( # Rename columns names inside DataFrame df = df.rename(columns=dictionary) - + # Check again for duplicates return df def run( @@ -389,7 +454,8 @@ def run( row_count=self.row_count, ) - df = self.change_column_name(df=df_raw) + df_col_changed = self.change_column_name(df=df_raw) + df = self._rename_duplicated_fields(df=df_col_changed) self.logger.info("Successfully changed structure of the DataFrame") return df