diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 006f22e39..ad2f298bd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -73,6 +73,7 @@ jobs: git config --global user.email 'github-actions[bot]@users.noreply.github.com' git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/$GITHUB_REPOSITORY black . + git fetch --all git checkout $GITHUB_HEAD_REF git commit -am "🎨 Format Python code with Black" git push diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a0be2f82..2ede484c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Added `validate_df` task to task_utils. +- Added `SharepointList` source class. +- Added `SharepointListToDF` task class. +- Added `SharepointListToADLS` flow class. +- Added tests for `SharepointList`. +- Added `get_nested_dict` to untils.py. +- Added `validate_df` task to `SharepointToADLS` class. + ### Fixed ### Changed @@ -618,4 +625,4 @@ specified in the `SUPERMETRICS_DEFAULT_USER` secret - Moved from poetry to pip ### Fixed -- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part +- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part \ No newline at end of file diff --git a/tests/integration/flows/test_bigquery_to_adls.py b/tests/integration/flows/test_bigquery_to_adls.py index de793344a..b4503c6e9 100644 --- a/tests/integration/flows/test_bigquery_to_adls.py +++ b/tests/integration/flows/test_bigquery_to_adls.py @@ -1,15 +1,14 @@ import os +from unittest import mock +import pandas as pd import pendulum import pytest -from unittest import mock -import pandas as pd - from prefect.tasks.secrets import PrefectSecret -from viadot.flows import BigQueryToADLS -from viadot.tasks import AzureDataLakeRemove from viadot.exceptions import ValidationError +from viadot.flows import BigQueryToADLS +from viadot.tasks import AzureDataLakeRemove ADLS_DIR_PATH = "raw/tests/" ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet" diff --git a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py index f0661e314..b0c3128c5 100644 --- a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py +++ b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py @@ -1,6 +1,6 @@ from viadot.config import local_config -from viadot.flows import CloudForCustomersReportToADLS from viadot.exceptions import ValidationError +from viadot.flows import CloudForCustomersReportToADLS def test_cloud_for_customers_report_to_adls(): diff --git a/tests/integration/flows/test_customer_gauge_to_adls.py b/tests/integration/flows/test_customer_gauge_to_adls.py index 0e7afd3e2..ac0716d66 100644 --- a/tests/integration/flows/test_customer_gauge_to_adls.py +++ b/tests/integration/flows/test_customer_gauge_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import CustomerGaugeToADLS from viadot.exceptions import ValidationError +from viadot.flows import CustomerGaugeToADLS DATA = { "user_name": ["Jane", "Bob"], diff --git a/tests/integration/flows/test_hubspot_to_adls.py b/tests/integration/flows/test_hubspot_to_adls.py index d960fc079..e0c06c20f 100644 --- a/tests/integration/flows/test_hubspot_to_adls.py +++ b/tests/integration/flows/test_hubspot_to_adls.py @@ -5,8 +5,8 @@ import pandas as pd import pytest -from viadot.flows import HubspotToADLS from viadot.exceptions import ValidationError +from viadot.flows import HubspotToADLS DATA = { "id": {"0": "820306930"}, diff --git a/tests/integration/flows/test_mediatool_to_adls.py b/tests/integration/flows/test_mediatool_to_adls.py index d7b5b2658..65cfadf8f 100644 --- a/tests/integration/flows/test_mediatool_to_adls.py +++ b/tests/integration/flows/test_mediatool_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import MediatoolToADLS from viadot.exceptions import ValidationError +from viadot.flows import MediatoolToADLS DATA = {"country": ["DK", "DE"], "sales": [3, 4]} ADLS_FILE_NAME = "test_mediatool.parquet" diff --git a/tests/integration/flows/test_mysql_to_adls.py b/tests/integration/flows/test_mysql_to_adls.py index 942bab99d..c968d48a3 100644 --- a/tests/integration/flows/test_mysql_to_adls.py +++ b/tests/integration/flows/test_mysql_to_adls.py @@ -1,4 +1,5 @@ from unittest import mock + from viadot.flows.mysql_to_adls import MySqlToADLS query = """SELECT * FROM `example-views`.`sales`""" diff --git a/tests/integration/flows/test_salesforce_to_adls.py b/tests/integration/flows/test_salesforce_to_adls.py index ec68a1227..8c032f308 100644 --- a/tests/integration/flows/test_salesforce_to_adls.py +++ b/tests/integration/flows/test_salesforce_to_adls.py @@ -2,9 +2,9 @@ from prefect.tasks.secrets import PrefectSecret +from viadot.exceptions import ValidationError from viadot.flows import SalesforceToADLS from viadot.tasks import AzureDataLakeRemove -from viadot.exceptions import ValidationError ADLS_FILE_NAME = "test_salesforce.parquet" ADLS_DIR_PATH = "raw/tests/" diff --git a/tests/integration/flows/test_sap_bw_to_adls.py b/tests/integration/flows/test_sap_bw_to_adls.py index 2c01049e8..4259e5c16 100644 --- a/tests/integration/flows/test_sap_bw_to_adls.py +++ b/tests/integration/flows/test_sap_bw_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import SAPBWToADLS from viadot.exceptions import ValidationError +from viadot.flows import SAPBWToADLS DATA = { "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], diff --git a/tests/integration/flows/test_sap_rfc_to_adls.py b/tests/integration/flows/test_sap_rfc_to_adls.py index ed33fa320..5503b4684 100644 --- a/tests/integration/flows/test_sap_rfc_to_adls.py +++ b/tests/integration/flows/test_sap_rfc_to_adls.py @@ -1,8 +1,8 @@ from viadot.config import local_config +from viadot.exceptions import ValidationError from viadot.flows import SAPRFCToADLS from viadot.sources import AzureDataLake from viadot.tasks import AzureDataLakeRemove -from viadot.exceptions import ValidationError try: import pyrfc diff --git a/tests/integration/flows/test_supermetrics_to_adls.py b/tests/integration/flows/test_supermetrics_to_adls.py index 9738ddeb1..15deaa01a 100644 --- a/tests/integration/flows/test_supermetrics_to_adls.py +++ b/tests/integration/flows/test_supermetrics_to_adls.py @@ -4,8 +4,8 @@ import pytest from prefect.storage import Local -from viadot.flows import SupermetricsToADLS from viadot.exceptions import ValidationError +from viadot.flows import SupermetricsToADLS CWD = os.getcwd() adls_dir_path = "raw/tests/supermetrics" diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py index c18eaad10..0f6705579 100644 --- a/tests/integration/flows/test_vidclub_to_adls.py +++ b/tests/integration/flows/test_vidclub_to_adls.py @@ -4,8 +4,8 @@ import pandas as pd import pytest -from viadot.flows import VidClubToADLS from viadot.exceptions import ValidationError +from viadot.flows import VidClubToADLS DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} ADLS_FILE_NAME = "test_vid_club.parquet" diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index c784fa682..f9453d7d1 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -1,4 +1,6 @@ import os +import re +from copy import deepcopy import pandas as pd import pytest @@ -6,7 +8,7 @@ from viadot.config import local_config from viadot.exceptions import CredentialError -from viadot.sources import Sharepoint +from viadot.sources import Sharepoint, SharepointList from viadot.task_utils import df_get_data_types_task from viadot.tasks.sharepoint import SharepointToDF @@ -18,7 +20,7 @@ def get_url() -> str: Returns: str: File URL. """ - return local_config["SHAREPOINT"].get("url") + return local_config["SHAREPOINT"].get("file_url") @pytest.fixture(scope="session") @@ -163,3 +165,211 @@ def test_get_data_types(file_name): dtypes = dtypes_map.values() assert "String" in dtypes + + +# testing get_connection function passing invalid credentials and raises AuthenticationContext error. +def test_get_connection(): + site_url = "https://velux.sharepoint.com/" + credentials = { + "SHAREPOINT_CERT": { + "TENANT": "xxx", + "CLIENT_ID": "123", + "SCOPES": "https://velux.sharepoint.com/", + "THUMBPRINT": "xyz", + "PRIVATE_KEY": "private", + } + } + + spl = SharepointList(credentials=credentials) + with pytest.raises( + AttributeError, match="'SharepointList' object has no attribute 'ctx'" + ): + spl.get_connection(site_url=site_url) + + +@pytest.fixture(scope="session") +def sharepoint_list(): + """ + Fixture for creating a Sharepoint class instance. + The class instance can be used within a test functions to interact with Sharepoint. + """ + spl = SharepointList() + yield spl + + +def test_valid_filters(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "operator1": "<", "value1": 10}, + "filter2": {"dtype": "str", "operator1": "==", "value1": "value"}, + } + result = sharepoint_list.check_filters(filters) + assert result is True + + +def test_invalid_dtype(sharepoint_list): + filters = { + "filter1": {"dtype": "list", "operator1": ">", "value1": 10}, + } + with pytest.raises(ValueError, match="dtype not allowed!"): + sharepoint_list.check_filters(filters) + + +def test_missing_operator1(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "value1": 10}, + } + with pytest.raises(ValueError, match="Operator1 is missing!"): + sharepoint_list.check_filters(filters) + + +def test_invalid_operator1(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "operator1": "*", "value1": 10}, + } + with pytest.raises(ValueError, match="Operator type not allowed!"): + sharepoint_list.check_filters(filters) + + +def test_missing_value1(sharepoint_list): + filters = { + "filter1": {"dtype": "int", "operator1": ">", "value1": None}, + } + with pytest.raises(ValueError, match="Value for operator1 is missing!"): + sharepoint_list.check_filters(filters) + + +def test_missing_operators_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "operator2": "<", + "value2": 20, + }, + } + with pytest.raises(ValueError, match="Operators for conjuction is missing!"): + sharepoint_list.check_filters(filters) + + +def test_invalid_operators_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "operator2": "<", + "value2": 20, + "operators_conjuction": "!", + }, + } + with pytest.raises(ValueError, match="Operators for conjuction not allowed!"): + sharepoint_list.check_filters(filters) + + +def test_invalid_filters_conjuction(sharepoint_list): + filters = { + "filter1": { + "dtype": "int", + "operator1": ">", + "value1": 10, + "filters_conjuction": "!", + }, + } + with pytest.raises( + ValueError, match="Filters operators for conjuction not allowed!" + ): + sharepoint_list.check_filters(filters) + + +def test_valid_mapping(sharepoint_list): + filters = { + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "|", + }, + "filter2": {"operator1": "==", "operator2": "!=", "operators_conjuction": "|"}, + } + expected_result = { + "filter1": { + "operator1": "gt", + "operator2": "le", + "operators_conjuction": "and", + "filters_conjuction": "or", + }, + "filter2": {"operator1": "eq", "operator2": "ne", "operators_conjuction": "or"}, + } + result = sharepoint_list.operators_mapping(deepcopy(filters)) + assert result == expected_result + + +def test_invalid_comparison_operator(sharepoint_list): + filters = { + "filter1": { + "operator1": "*", + "operator2": "<=", + "operators_conjuction": "&", + "filters_conjuction": "|", + }, + } + error_message = "This comparison operator: * is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + + +def test_invalid_logical_operator(sharepoint_list): + filters = { + "filter1": { + "operator1": ">", + "operator2": "<=", + "operators_conjuction": "!", + "filters_conjuction": "|", + }, + } + error_message = "This conjuction(logical) operator: ! is not allowed. Please read the function documentation for details!" + with pytest.raises(ValueError, match=re.escape(error_message)): + sharepoint_list.operators_mapping(deepcopy(filters)) + + +def test_single_filter_datetime_api(sharepoint_list): + filters = { + "date_column": {"dtype": "datetime", "operator1": ">", "value1": "2023-01-01"} + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "date_column gt datetime'2023-01-01T00:00:00' " + assert result == expected_result + + +def test_multiple_filters_api(sharepoint_list): + filters = { + "int_column": { + "dtype": "int", + "operator1": ">=", + "value1": 10, + "operator2": "<", + "value2": 20, + }, + "str_column": {"dtype": "str", "operator1": "==", "value1": "example"}, + } + result = sharepoint_list.make_filter_for_api(filters) + expected_result = "int_column ge '10'int_column lt '20'str_column eq 'example'" + assert result == expected_result + + +def test_single_df_filter(sharepoint_list): + filters = {"column1": {"operator1": ">", "value1": 10}} + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')]" + assert result == expected_result + + +def test_multiple_df_filters(sharepoint_list): + filters = { + "column1": {"operator1": ">", "value1": 10, "filters_conjuction": "&"}, + "column2": {"operator1": "<", "value1": 20}, + } + result = sharepoint_list.make_filter_for_df(filters) + expected_result = "df.loc[(df.column1 > '10')&(df.column2 < '20')]" + assert result == expected_result diff --git a/tests/unit/test_task_utils.py b/tests/unit/test_task_utils.py index e77c24fdd..969b699a4 100644 --- a/tests/unit/test_task_utils.py +++ b/tests/unit/test_task_utils.py @@ -19,8 +19,8 @@ df_to_parquet, dtypes_to_json_task, union_dfs_task, - write_to_json, validate_df, + write_to_json, ) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index a94eaff9f..777617244 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -6,9 +6,9 @@ from viadot.signals import SKIP from viadot.utils import ( + add_viadot_metadata_columns, check_if_empty_file, gen_bulk_insert_query_from_df, - add_viadot_metadata_columns, ) EMPTY_CSV_PATH = "empty.csv" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index de2f618ab..2f30c04d8 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -11,7 +11,7 @@ from .genesys_to_adls import GenesysToADLS from .outlook_to_adls import OutlookToADLS from .salesforce_to_adls import SalesforceToADLS -from .sharepoint_to_adls import SharepointToADLS +from .sharepoint_to_adls import SharepointListToADLS, SharepointToADLS from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index df5562221..0d2deb557 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -3,12 +3,10 @@ from typing import Any, Dict, List import pendulum -from prefect import Flow, task +from prefect import Flow from prefect.backend import set_key_value from prefect.utilities import logging -logger = logging.get_logger() - from viadot.task_utils import ( add_ingestion_metadata_task, df_get_data_types_task, @@ -16,9 +14,12 @@ df_to_csv, df_to_parquet, dtypes_to_json_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload -from viadot.tasks.sharepoint import SharepointToDF +from viadot.tasks.sharepoint import SharepointListToDF, SharepointToDF + +logger = logging.get_logger() class SharepointToADLS(Flow): @@ -38,6 +39,7 @@ def __init__( overwrite_adls: bool = False, if_empty: str = "warn", if_exists: str = "replace", + validate_df_dict: dict = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -62,6 +64,8 @@ def __init__( Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_empty (str, optional): What to do if query returns no data. Defaults to "warn". + validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -74,6 +78,7 @@ def __init__( self.sheet_number = sheet_number self.validate_excel_file = validate_excel_file self.timeout = timeout + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.overwrite = overwrite_adls @@ -117,6 +122,10 @@ def gen_flow(self) -> Flow: flow=self, ) + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) df_mapped = df_map_mixed_dtypes_for_parquet.bind( @@ -169,3 +178,188 @@ def gen_flow(self) -> Flow: @staticmethod def slugify(name): return name.replace(" ", "_").lower() + + +class SharepointListToADLS(Flow): + def __init__( + self, + name: str, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + sp_cert_credentials_secret: str = None, + vault_name: str = None, + path: str = None, + adls_dir_path: str = None, + adls_file_name: str = None, + adls_sp_credentials_secret: str = None, + overwrite_adls: bool = True, + output_file_extension: str = ".parquet", + validate_df_dict: dict = None, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Run Flow SharepointListToADLS. + + Args: + name (str): Prefect flow name. + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', # conjuction operators allowed only when 2 values passed + 'filters_conjuction':'&', # conjuction filters allowed only when 2 columns passed + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + sp_cert_credentials_secret (str): Credentials to verify Sharepoint connection. Default to None. + vault_name (str): KeyVaultSecret name. Default to None. + path (str): Local file path. Default to None. + adls_dir_path (str): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_file_name (str, optional): Name of file in ADLS. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, + CLIENT_SECRET) for the Azure Data Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to True. + + Returns: + .parquet file inside ADLS. + """ + + # SharepointListToDF + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.sp_cert_credentials_secret = sp_cert_credentials_secret + self.vault_name = vault_name + self.row_count = row_count + self.validate_df_dict = validate_df_dict + + # AzureDataLakeUpload + self.adls_dir_path = adls_dir_path + self.adls_file_name = adls_file_name + self.overwrite = overwrite_adls + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.output_file_extension = output_file_extension + self.now = str(pendulum.now("utc")) + if self.path is not None: + self.local_file_path = ( + self.path + self.slugify(name) + self.output_file_extension + ) + else: + self.local_file_path = self.slugify(name) + self.output_file_extension + self.local_json_path = self.slugify(name) + ".json" + self.adls_dir_path = adls_dir_path + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + + super().__init__( + name=name, + *args, + **kwargs, + ) + + self.gen_flow() + + def gen_flow(self) -> Flow: + s = SharepointListToDF( + path=self.path, + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + field_property=self.field_property, + filters=self.filters, + row_count=self.row_count, + credentials_secret=self.sp_cert_credentials_secret, + ) + df = s.run() + + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) + validation.set_upstream(df, flow=self) + + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) + + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.path, + flow=self, + ) + + file_to_adls_task = AzureDataLakeUpload() + file_to_adls_task.bind( + from_path=self.path, + to_path=self.adls_dir_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_to_json_task.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) + + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json_task.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json_task, flow=self) + + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() diff --git a/viadot/flows/supermetrics_to_adls.py b/viadot/flows/supermetrics_to_adls.py index 5a104cff7..23aadee57 100644 --- a/viadot/flows/supermetrics_to_adls.py +++ b/viadot/flows/supermetrics_to_adls.py @@ -18,8 +18,8 @@ dtypes_to_json_task, union_dfs_task, update_dtypes_dict, - write_to_json, validate_df, + write_to_json, ) from viadot.tasks import ( AzureDataLakeUpload, diff --git a/viadot/flows/transform_and_catalog.py b/viadot/flows/transform_and_catalog.py index 1de5c4430..08ac6b895 100644 --- a/viadot/flows/transform_and_catalog.py +++ b/viadot/flows/transform_and_catalog.py @@ -1,13 +1,13 @@ import os -from pathlib import Path import shutil +from pathlib import Path from typing import Dict, List, Union from prefect import Flow, task from prefect.tasks.shell import ShellTask from prefect.triggers import any_successful -from viadot.tasks import CloneRepo, AzureKeyVaultSecret, LumaIngest +from viadot.tasks import AzureKeyVaultSecret, CloneRepo, LumaIngest @task(trigger=any_successful) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 094cec14e..c0d96abe2 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -8,7 +8,7 @@ from .outlook import Outlook from .salesforce import Salesforce from .sftp import SftpConnector -from .sharepoint import Sharepoint +from .sharepoint import Sharepoint, SharepointList from .supermetrics import Supermetrics try: diff --git a/viadot/sources/bigquery.py b/viadot/sources/bigquery.py index 1be69e866..32d1dac2c 100644 --- a/viadot/sources/bigquery.py +++ b/viadot/sources/bigquery.py @@ -6,8 +6,8 @@ from ..config import local_config from ..exceptions import CredentialError, DBDataAccessError -from .base import Source from ..utils import add_viadot_metadata_columns +from .base import Source class BigQuery(Source): diff --git a/viadot/sources/mindful.py b/viadot/sources/mindful.py index 254eecb9d..2698adb15 100644 --- a/viadot/sources/mindful.py +++ b/viadot/sources/mindful.py @@ -1,12 +1,12 @@ import os -from io import StringIO from datetime import datetime, timedelta +from io import StringIO from typing import Any, Dict, Literal, Tuple import pandas as pd import prefect -from requests.models import Response from requests.auth import HTTPBasicAuth +from requests.models import Response from viadot.exceptions import APIError from viadot.sources.base import Source diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index 61eda17f2..6e935eee2 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -1,11 +1,27 @@ -from typing import Any, Dict +from copy import deepcopy +from datetime import datetime +from fnmatch import fnmatch +from typing import Any, Dict, List +import pandas as pd import sharepy +from office365.runtime.auth.authentication_context import AuthenticationContext +from office365.runtime.client_request_exception import ClientRequestException +from office365.sharepoint.client_context import ClientContext +from prefect.utilities import logging + +from viadot.utils import get_nested_dict from ..config import local_config from ..exceptions import CredentialError from .base import Source +logger = logging.get_logger() + +# Print out how many rows was extracted in specific iteration +def log_of_progress(items): + logger.info("Items read: {0}".format(len(items))) + class Sharepoint(Source): """ @@ -64,3 +80,475 @@ def download_file( url=download_from_path, filename=download_to_path, ) + + +class SharepointList(Source): + """ + A Sharepoint_List class to connect and download data from sharpoint lists. + + Args: + credentials (dict): Credentials should include: + - "tenant" + - "client_id" + - "scopes" + - "thumbprint" + - "private_key" + """ + + def __init__( + self, + credentials: Dict[str, Any] = None, + *args, + **kwargs, + ): + DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT_CERT") + credentials = credentials or DEFAULT_CREDENTIALS + if credentials is None: + raise CredentialError("Credentials not found.") + + super().__init__(*args, credentials=credentials, **kwargs) + + def get_connection( + self, + site_url: str = None, + ): + + # Connecting into Sharepoint with AuthenticationContext + try: + auth_context = AuthenticationContext(site_url) + auth_context.with_client_certificate( + tenant=self.credentials["TENANT"], + client_id=self.credentials["CLIENT_ID"], + scopes=[self.credentials["SCOPES"]], + thumbprint=self.credentials["THUMBPRINT"], + private_key=self.credentials["PRIVATE_KEY"], + ) + + self.ctx = ClientContext(site_url, auth_context) + logger.info("Successfully connect to Sharepoint Lists") + + except Exception as ex: + logger.info(f"Error at ClientContext or authentication level: {ex}") + + return self.ctx + + # Function for extracting list items from search fields + def _unpack_fields( + self, + list_item, + selected_fields: dict = None, + ): + + # Creating the body of dictionary + new_dict = dict() + + # For loop scanning the propertys of searching fields + item_values_dict = list_item.properties + for field, val in item_values_dict.items(): + nested_dict = get_nested_dict(val) + # Check if the dictionary is nested + if nested_dict != None: + # It might be that there are different field properties than expected + nested_value = nested_dict.get(selected_fields["FieldProperty"]) + if nested_value != None: + new_dict[field] = nested_value + else: + logger.info("I'm not the right value") + raise ValueError + else: + new_dict[field] = val + + return new_dict + + def get_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + ): + + ctx = self.get_connection(site_url=site_url) + + # Get list of lists object by List Title + self.list_object = ctx.web.lists.get_by_title(list_title) + list_fields_all = self.list_object.fields + + # Get all or specifics list of objects + if required_fields is None: + ctx.load(list_fields_all) + ctx.execute_query() + + return list_fields_all + + else: + list_fields_required = [ + list_fields_all.get_by_internal_name_or_title(field).get() + for field in required_fields + ] + ctx.execute_batch() + + return list_fields_required + + def select_expandable_user_fields( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + ): + """ + Method to expand fields and get more informations. + field_property to expand can be: ID, Title, FieldTypeKind, TypeAsString and many more. + -> more properties can be discovered by getting list.item.properties. + Default to "Title" + """ + + list_fields = self.get_fields( + list_title=list_title, site_url=site_url, required_fields=required_fields + ) + + # Finding the "selected" fields + fields_to_select = [ + field.properties["InternalName"] + f"/{field_property}" + if fnmatch(field.properties["TypeAsString"], "User*") + else field.properties["InternalName"] + for field in list_fields + ] + # Finding the "expanded" fields + fields_to_expand = [ + field.properties["InternalName"] + for field in list_fields + if fnmatch(field.properties["TypeAsString"], f"User*") + ] + + # Creating the body of the function output + selected_fields = { + "FieldInternalNames": fields_to_select, + "FieldToExpand": fields_to_expand, + "FieldProperty": field_property, + } + + return selected_fields + + def check_filters( + self, + filters: dict = None, + ) -> bool: + """ + Function to check if filters dict is valid. + example1: if operator2 is present value2 must be in place as well + example2: if dtype is not on allowed list it will throw an error + """ + + allowed_dtypes = ["datetime", "date", "bool", "int", "float", "complex", "str"] + allowed_conjuction = ["&", "|"] + allowed_operators = ["<", ">", "<=", ">=", "==", "!="] + + for parameters in filters.values(): + if parameters.get("dtype") not in allowed_dtypes: + raise ValueError( + f"dtype not allowed! Expected {allowed_dtypes} got: {parameters.get('dtype')}." + ) + if parameters.get("operator1"): + if parameters.get("operator1") not in allowed_operators: + raise ValueError( + f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator1')}." + ) + if not parameters.get("value1"): + raise ValueError("Value for operator1 is missing!") + elif not parameters.get("operator1"): + raise ValueError("Operator1 is missing!") + if ( + not parameters.get("operator2") + and parameters.get("operators_conjuction") is not None + ): + raise ValueError( + f"Operator conjuction allowed only with more than one filter operator!" + ) + if parameters.get("operator2"): + if parameters.get("operator2") not in allowed_operators: + raise ValueError( + f"Operator type not allowed! Expected {allowed_operators} got: {parameters.get('operator2')}." + ) + if not parameters.get("value2"): + raise ValueError("Value for operator2 is missing!") + if not parameters.get("operators_conjuction"): + raise ValueError( + f"Operators for conjuction is missing! Expected {allowed_conjuction} got empty." + ) + if parameters.get("operators_conjuction") not in allowed_conjuction: + raise ValueError( + f"Operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('operators_conjuction')}." + ) + if parameters.get("filters_conjuction"): + if ( + len(filters.keys()) == 1 + and parameters.get("filters_conjuction") is not None + ): + raise ValueError( + f"Filters conjuction allowed only with more than one filter column!" + ) + if parameters.get("filters_conjuction") not in allowed_conjuction: + raise ValueError( + f"Filters operators for conjuction not allowed! Expected {allowed_conjuction} got {parameters.get('filters_conjuction')}." + ) + + return True + + def operators_mapping( + self, + filters: dict = None, + ) -> dict: + """ + Function for mapping comparison and conjuction(logical) operators of filters to the format which is recognized by Microsoft API. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + New modified dict. + """ + + filters_dict = deepcopy(filters) + operators = { + "<": "lt", + ">": "gt", + "<=": "le", + ">=": "ge", + "==": "eq", + "!=": "ne", + } + logical_op = {"&": "and", "|": "or"} + + for parameters in filters_dict.values(): + if parameters.get("operator1"): + operator1_to_change = parameters.get("operator1") + if operator1_to_change in operators.keys(): + parameters["operator1"] = operators[operator1_to_change] + else: + raise ValueError( + f"This comparison operator: {operator1_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("operator2"): + operator2_to_change = parameters.get("operator2") + if operator2_to_change in operators.keys(): + parameters["operator2"] = operators[operator2_to_change] + else: + raise ValueError( + f"This comparison operator: {operator2_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("operators_conjuction"): + logical_op_to_change = parameters.get("operators_conjuction") + if logical_op_to_change in logical_op.keys(): + parameters["operators_conjuction"] = logical_op[ + logical_op_to_change + ] + else: + raise ValueError( + f"This conjuction(logical) operator: {logical_op_to_change} is not allowed. Please read the function documentation for details!" + ) + if parameters.get("filters_conjuction"): + logical_fl_to_change = parameters.get("filters_conjuction") + if logical_fl_to_change in logical_op.keys(): + parameters["filters_conjuction"] = logical_op[logical_fl_to_change] + else: + raise ValueError( + f"This conjuction(logical) operator: {logical_fl_to_change} is not allowed. Please read the function documentation for details!" + ) + + return filters_dict + + def make_filter_for_api(self, filters: dict) -> "str": + """ + Function changing type of operators to match MS API style as 'str' passing to URL call. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter parameter to API. + """ + + filter_text = "" + filters_mod = self.operators_mapping(filters) + + for column, parameters in filters_mod.items(): + if parameters.get("dtype") in ["datetime", "date"]: + from_date1 = datetime.strptime( + parameters.get("value1"), "%Y-%m-%d" + ).isoformat() + filter_text = ( + filter_text + + f"{column} {parameters.get('operator1')} datetime'{from_date1}' " + ) + if parameters.get("operator2"): + from_date2 = datetime.strptime( + parameters.get("value2"), "%Y-%m-%d" + ).isoformat() + filter_text = ( + filter_text + + f" {parameters.get('operators_conjuction')} {column} {parameters.get('operator2')} datetime'{from_date2}' " + ) + elif parameters.get("dtype") not in ["datetime", "date"]: + filter_text = ( + filter_text + + f"{column} {parameters.get('operator1')} '{parameters.get('value1')}'" + ) + if parameters.get("operator2"): + filter_text = ( + filter_text + + f"{column} {parameters.get('operator2')} '{parameters.get('value2')}'" + ) + if parameters.get("filters_conjuction"): + filter_text = filter_text + f"{parameters.get('filters_conjuction')} " + + return filter_text + + def make_filter_for_df( + self, + filters: dict = None, + ) -> "str": + """ + Function changing dict operators into pandas DataFrame filters. + + Args: + filters (dict): A dictionar which contains operators. + + Returns: + Output as string to pass as filter to DataFrame. + """ + + filter_in_df = "df.loc[" + + for column, parameters in filters.items(): + filter_in_df = ( + filter_in_df + + f"(df.{column} {parameters.get('operator1', '')} '{parameters.get('value1', '')}'" + ) + + if parameters.get("operator2"): + filter_in_df = ( + filter_in_df + + f") {parameters.get('operators_conjuction')} (df.{column} {parameters.get('operator2', '')} '{parameters.get('value2', '')}'" + ) + + if parameters.get("filters_conjuction"): + filter_in_df = filter_in_df + ")" + parameters.get("filters_conjuction") + + else: + filter_in_df = filter_in_df + ")" + + filter_in_df = filter_in_df + "]" + + return filter_in_df + + def list_item_to_df( + self, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + ): + """ + Method to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pd.DataFrame + """ + + # checking if the passed filters dictionary is correctly built + if filters is not None: + self.check_filters(filters) + # checking that the filter parameters are included in the desired field parameters + for key in filters: + if key not in required_fields: + raise AttributeError( + f"Filter '{key}' not included inside required fields. It is obligatory to extract data which is filtered!" + ) + + # changing the body of the filter for MS API call + filter_text = self.make_filter_for_api(filters) + + download_all = False + + # extracting requeird_fields SP_List objects + selected_fields = self.select_expandable_user_fields( + list_title=list_title, + site_url=site_url, + required_fields=required_fields, + field_property=field_property, + ) + + try: + # Extract data below 5k rows or max limitation of the specific SP List with basic filtering. + if filters is None: + raise ValueError("There is no filter. Starting extraxction all data") + else: + list_items = ( + self.list_object.items.filter(filter_text) + .select(selected_fields["FieldInternalNames"]) + .top(row_count) + .expand(selected_fields["FieldToExpand"]) + ) + self.ctx.load(list_items) + self.ctx.execute_query() + + except (ClientRequestException, ValueError) as e: + # Extract all data from specific SP List without basic filtering. Additional logic for filtering applied on DataFreame level. + logger.info(f"Exception SPQueryThrottledException occurred: {e}") + list_items = ( + self.list_object.items.get_all(row_count, log_of_progress) + .select(selected_fields["FieldInternalNames"]) + .expand(selected_fields["FieldToExpand"]) + ) + self.ctx.load(list_items) + self.ctx.execute_query() + download_all = True + + df = pd.DataFrame( + [self._unpack_fields(row_item, selected_fields) for row_item in list_items] + ) + + if download_all == True and filters is not None: + # Filter for desired range of created date and for factory Namyslow PL + self.logger.info("Filtering df with all data output") + filter_for_df = self.make_filter_for_df(filters) + df = eval(filter_for_df) + + return df diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 6173e2994..6a532f932 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -1,8 +1,8 @@ import copy import json import os -import shutil import re +import shutil from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 02791852b..35543045a 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .outlook import OutlookToDF from .prefect_date_range import GetFlowNewDateRange from .salesforce import SalesforceBulkUpsert, SalesforceToDF, SalesforceUpsert -from .sharepoint import SharepointToDF +from .sharepoint import SharepointListToDF, SharepointToDF from .sqlite import SQLiteInsert, SQLiteQuery, SQLiteSQLtoDF from .supermetrics import SupermetricsToCSV, SupermetricsToDF @@ -50,11 +50,11 @@ from .duckdb import DuckDBCreateTableFromParquet, DuckDBQuery, DuckDBToDF from .epicor import EpicorOrdersToDF from .eurostat import EurostatToDF +from .git import CloneRepo from .hubspot import HubspotToDF +from .luma import LumaIngest from .mediatool import MediatoolToDF from .mindful import MindfulToCSV from .sftp import SftpList, SftpToDF from .sql_server import SQLServerCreateTable, SQLServerQuery, SQLServerToDF from .vid_club import VidClubToDF -from .git import CloneRepo -from .luma import LumaIngest \ No newline at end of file diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index de47ddebf..dda6c4dde 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -10,10 +10,10 @@ from prefect.engine import signals from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs -from viadot.task_utils import * from viadot.exceptions import APIError from viadot.sources import Genesys +from viadot.task_utils import * logger = logging.get_logger() diff --git a/viadot/tasks/luma.py b/viadot/tasks/luma.py index 5b78ebc27..11eb91e45 100644 --- a/viadot/tasks/luma.py +++ b/viadot/tasks/luma.py @@ -1,5 +1,7 @@ import json + from prefect.tasks.shell import ShellTask + from .azure_key_vault import AzureKeyVaultSecret diff --git a/viadot/tasks/sap_bw.py b/viadot/tasks/sap_bw.py index acc92c246..0d8d7b2e3 100644 --- a/viadot/tasks/sap_bw.py +++ b/viadot/tasks/sap_bw.py @@ -1,12 +1,12 @@ import pandas as pd from prefect import Task from prefect.tasks.secrets import PrefectSecret -from viadot.tasks import AzureKeyVaultSecret from prefect.utilities import logging from viadot.exceptions import ValidationError from viadot.sources import SAPBW from viadot.task_utils import * +from viadot.tasks import AzureKeyVaultSecret logger = logging.get_logger() diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index 7ba9c4d41..c4d670617 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -1,6 +1,7 @@ import copy import json import os +import re from typing import List import pandas as pd @@ -10,9 +11,9 @@ from prefect.utilities.tasks import defaults_from_attrs from ..exceptions import ValidationError -from ..sources import Sharepoint -from .azure_key_vault import AzureKeyVaultSecret +from ..sources import Sharepoint, SharepointList from ..utils import add_viadot_metadata_columns +from .azure_key_vault import AzureKeyVaultSecret logger = logging.get_logger() @@ -230,3 +231,165 @@ def run( df = self.df_replace_special_chars(df) self.logger.info(f"Successfully converted data to a DataFrame.") return df + + +class SharepointListToDF(Task): + """ + Task to extract data from Sharepoint List into DataFrame. + + Args: + list_title (str): Title of Sharepoint List. Default to None. + site_url (str): URL to set of Sharepoint Lists. Default to None. + required_fields (List[str]): Required fields(columns) need to be extracted from + Sharepoint List. Default to None. + field_property (List[str]): Property to expand with expand query method. + All propertys can be found under list.item.properties. + Default to ["Title"] + filters (dict): Dictionary with operators which filters the SharepointList output. + allowed dtypes: ('datetime','date','bool','int', 'float', 'complex', 'str') + allowed conjuction: ('&','|') + allowed operators: ('<','>','<=','>=','==','!=') + Example how to build the dict: + filters = { + 'Column_name_1' : + { + 'dtype': 'datetime', + 'value1':'YYYY-MM-DD', + 'value2':'YYYY-MM-DD', + 'operator1':'>=', + 'operator2':'<=', + 'operators_conjuction':'&', + 'filters_conjuction':'&', + } + , + 'Column_name_2' : + { + 'dtype': 'str', + 'value1':'NM-PL', + 'operator1':'==', + }, + } + row_count (int): Number of downloaded rows in single request. Default to 5000. + + Returns: + pandas DataFrame + """ + + def __init__( + self, + path: str = None, + list_title: str = None, + site_url: str = None, + required_fields: List[str] = None, + field_property: str = "Title", + filters: dict = None, + row_count: int = 5000, + credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + + self.path = path + self.list_title = list_title + self.site_url = site_url + self.required_fields = required_fields + self.field_property = field_property + self.filters = filters + self.row_count = row_count + self.vault_name = vault_name + self.credentials_secret = credentials_secret + + if not credentials_secret: + # Attempt to read a default for the service principal secret name + try: + credentials_secret = PrefectSecret("SHAREPOINT-CERT").run() + except ValueError: + pass + + if credentials_secret: + credentials_str = AzureKeyVaultSecret( + secret=self.credentials_secret, vault_name=self.vault_name + ).run() + self.credentials = json.loads(credentials_str) + + super().__init__( + *args, + **kwargs, + ) + + def __call__(self): + """Download Sharepoint_List data to a .parquet file""" + super().__call__(self) + + def _convert_camel_case_to_words(self, input_str: str) -> str: + + self.input_str = input_str + + words = re.findall(r"[A-Z][a-z]*|[0-9]+", self.input_str) + converted = " ".join(words) + + return converted + + def change_column_name( + self, + df: pd.DataFrame = None, + ): + s = SharepointList() + list_fields = s.get_fields( + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + ) + + self.logger.info("Changing columns names") + column_names_correct = [field.properties["Title"] for field in list_fields] + column_names_code = [field.properties["InternalName"] for field in list_fields] + dictionary = dict(zip(column_names_code, column_names_correct)) + + # If duplicates in names from "Title" take "InternalName" + value_count = {} + duplicates = [] + + for key, value in dictionary.items(): + if value in value_count: + if value_count[value] not in duplicates: + duplicates.append(value_count[value]) + duplicates.append(key) + else: + value_count[value] = key + + for key in duplicates: + dictionary[key] = self._convert_camel_case_to_words(key) + + # Rename columns names inside DataFrame + df = df.rename(columns=dictionary) + + return df + + def run( + self, + ) -> None: + """ + Run Task SharepointListToDF. + + Returns: + pd.DataFrame + """ + + s = SharepointList( + credentials=self.credentials, + ) + df_raw = s.list_item_to_df( + list_title=self.list_title, + site_url=self.site_url, + required_fields=self.required_fields, + field_property=self.field_property, + filters=self.filters, + row_count=self.row_count, + ) + + df = self.change_column_name(df=df_raw) + self.logger.info("Successfully changed structure of the DataFrame") + + return df diff --git a/viadot/utils.py b/viadot/utils.py index 2b4c80538..d05cfdd95 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -449,3 +449,14 @@ def wrapper(*args, **kwargs) -> pd.DataFrame: return wrapper return decorator + + +def get_nested_dict(d): + if isinstance(d, dict): + for lvl in d.values(): + if isinstance(lvl, dict): + return get_nested_dict(lvl) + else: + return d + else: + return None