diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a68d95ac..850ac7c67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,43 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + ## [Unreleased] +## [0.2.12] +### Added +- Added `Sharepoint` source +- Added `SharepointToDF` task +- Added `SharepointToADLS` flow +- Added `CloudForCustomers` source +- Added `c4c_report_to_df` taks +- Added `def c4c_to_df` task +- Added `CloudForCustomersReportToADLS` flow +- Added `df_to_csv` task to task_utils.py +- Added `df_to_parquet` task to task_utils.py +- Added `dtypes_to_json` task to task_utils.py +## [0.2.11] +### Fixed +- `ADLSToAzureSQL` - fixed path to csv issue. +- `SupermetricsToADLS` - fixed local json path issue. + +## [0.2.10] - 2021-10-29 +### Release due to CI/CD error + +## [0.2.9] - 2021-10-29 +### Release due to CI/CD error + +## [0.2.8] - 2021-10-29 +### Changed +- CI/CD: `dev` image is now only published on push to the `dev` branch +- Docker: + - updated registry links to use the new `ghcr.io` domain + - `run.sh` now also accepts the `-t` option. When run in standard mode, it will only spin up the `viadot_jupyter_lab` service. + When ran with `-t dev`, it will also spin up `viadot_testing` and `viadot_docs` containers. + +### Fixed +- `ADLSToAzureSQL` - fixed path parameter issue. + ## [0.2.11] ### Fixed - ADLSToAzureSQL - fixed path to csv issue. diff --git a/requirements.txt b/requirements.txt index b02384194..2d8fbcead 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,4 @@ PyGithub==1.55 Shapely==1.8.0 imagehash==4.2.1 visions==0.7.4 +sharepy==1.3.0 \ No newline at end of file diff --git a/tests/integration/flows/test_cloud_for_customers_report_to_adls.py b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py new file mode 100644 index 000000000..553a161bf --- /dev/null +++ b/tests/integration/flows/test_cloud_for_customers_report_to_adls.py @@ -0,0 +1,29 @@ +from viadot.flows import CloudForCustomersReportToADLS +from viadot.config import local_config + + +def test_cloud_for_customers_report_to_adls(): + credentials = local_config.get("CLOUD_FOR_CUSTOMERS") + credentials_prod = credentials["Prod"] + channels = ["VEL_B_AFS", "VEL_B_ASA"] + month = ["01"] + year = ["2021"] + flow = CloudForCustomersReportToADLS( + direct_url=credentials_prod["server"], + source_type="Prod", + channels=channels, + months=month, + years=year, + name="test_c4c_report_to_adls", + local_file_path=f"test_c4c_report_to_adls.csv", + adls_sp_credentials_secret=credentials["adls_sp_credentials_secret"], + adls_dir_path=credentials["adls_dir_path"], + ) + number_of_urls = len(month) * len(year) * len(channels) + assert len(flow.report_urls_with_filters) == number_of_urls + + result = flow.run() + assert result.is_successful() + + task_results = result.result.values() + assert all([task_result.is_successful() for task_result in task_results]) diff --git a/tests/integration/flows/test_cloud_for_customers_to_adls.py b/tests/integration/flows/test_cloud_for_customers_to_adls.py deleted file mode 100644 index 70fa99ce6..000000000 --- a/tests/integration/flows/test_cloud_for_customers_to_adls.py +++ /dev/null @@ -1,20 +0,0 @@ -import logging -import os - -import pytest -from prefect.storage import Local -from viadot.flows import CloudForCustomersToADLS -from viadot.config import local_config - - -def test_cloud_for_customers_to_adls(): - credentials = local_config.get("CLOUD_FOR_CUSTOMERS") - flow = CloudForCustomersToADLS( - url="http://services.odata.org/V2/Northwind/Northwind.svc/", - endpoint="Employees", - name="test_c4c_adls", - adls_sp_credentials_secret=credentials["adls_sp_credentials_secret"], - adls_dir_path=credentials["adls_dir_path"], - ) - result = flow.run() - assert result.is_successful() diff --git a/tests/integration/test_cloud_for_customers.py b/tests/integration/test_cloud_for_customers.py index bbe24d7ec..46fe71854 100644 --- a/tests/integration/test_cloud_for_customers.py +++ b/tests/integration/test_cloud_for_customers.py @@ -13,7 +13,9 @@ def cloud_for_customers(): url = "http://services.odata.org/V2/Northwind/Northwind.svc/" endpoint = "Employees" - cloud_for_customers = CloudForCustomers(url=url, endpoint=endpoint) + cloud_for_customers = CloudForCustomers( + url=url, endpoint=endpoint, params={"$top": "2"} + ) yield cloud_for_customers os.remove(TEST_FILE_1) @@ -36,10 +38,10 @@ def test_csv(cloud_for_customers): def test_credentials(): - credentials = local_config.get("CLOUD_FOR_CUSTOMERS") - url = credentials["server"] + qa_credentials = local_config.get("CLOUD_FOR_CUSTOMERS")["QA"] + url = qa_credentials["server"] endpoint = "ServiceRequestCollection" - c4c = CloudForCustomers(url=url, endpoint=endpoint) + c4c = CloudForCustomers(url=url, endpoint=endpoint, params={"$top": "2"}) df = c4c.to_df( fields=["ProductRecipientPartyName", "CreationDateTime", "CreatedBy"] ) diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py new file mode 100644 index 000000000..9db07c105 --- /dev/null +++ b/tests/integration/test_sharepoint.py @@ -0,0 +1,61 @@ +import pytest +import os +import pathlib +import pandas as pd +from typing import List + +from viadot.sources import Sharepoint +from viadot.flows import SharepointToADLS as s_flow +from viadot.config import local_config +from viadot.task_utils import df_get_data_types_task + +s = Sharepoint() + +FILE_NAME = "EUL Data.xlsm" +s.download_file(download_to_path=FILE_NAME) +DF = pd.read_excel(FILE_NAME, sheet_name=0) + + +def test_credentials(): + credentials = {"site": "tenant.sharepoint.com", "username": "User"} + s = Sharepoint(credentials=credentials) + with pytest.raises(ValueError, match="Missing credentials."): + s.get_connection() + + +def test_connection(): + credentials = local_config.get("SHAREPOINT") + site = f'https://{credentials["site"]}' + conn = s.get_connection() + response = conn.get(site) + assert response.status_code == 200 + + +def test_file_extension(): + file_ext = [".xlsm", ".xlsx"] + assert pathlib.Path(s.download_from_path).suffix in file_ext + + +def test_file_name(): + assert os.path.basename(s.download_from_path) == FILE_NAME + + +def test_file_download(): + s.download_file(download_to_path=FILE_NAME) + files = [] + for file in os.listdir(): + if os.path.isfile(os.path.join(file)): + files.append(file) + assert FILE_NAME in files + os.remove(FILE_NAME) + + +def test_file_to_df(): + df_test = pd.DataFrame(data={"col1": [1, 2]}) + assert type(DF) == type(df_test) + + +def test_get_data_types(): + dtypes_map = df_get_data_types_task.run(DF) + dtypes = [v for k, v in dtypes_map.items()] + assert "String" in dtypes diff --git a/tests/test_viadot.py b/tests/test_viadot.py index 1a7dc9444..1ca4a5ec6 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.2.11" + assert __version__ == "0.2.12" diff --git a/tests/unit/test_task_utils.py b/tests/unit/test_task_utils.py new file mode 100644 index 000000000..f4bc977ee --- /dev/null +++ b/tests/unit/test_task_utils.py @@ -0,0 +1,34 @@ +import pytest +import pandas as pd +from typing import List + +from viadot.task_utils import df_get_data_types_task, df_map_mixed_dtypes_for_parquet + + +def count_dtypes(dtypes_dict: dict = None, dtypes_to_count: List[str] = None) -> int: + dtypes_counter = 0 + for v in dtypes_dict.values(): + if v in dtypes_to_count: + dtypes_counter += 1 + return dtypes_counter + + +def test_map_dtypes_for_parquet(): + df = pd.DataFrame( + { + "a": {0: 55.7, 1: "Hello", 2: "Hello"}, + "b": {0: "Start", 1: "Hello", 2: "Hello"}, + "w": {0: 679, 1: "Hello", 2: "Hello"}, + "x": {0: 1, 1: 2, 2: 444}, + "y": {0: 1.5, 1: 11.97, 2: 56.999}, + "z": {0: "Start", 1: 1, 2: "2021-01-01"}, + } + ) + dtyps_dict = df_get_data_types_task.run(df) + sum_of_dtypes = count_dtypes(dtyps_dict, ["Object", "String"]) + + df_map = df_map_mixed_dtypes_for_parquet.run(df, dtyps_dict) + dtyps_dict_mapped = df_get_data_types_task.run(df_map) + sum_of_mapped_dtypes = count_dtypes(dtyps_dict_mapped, ["String"]) + + assert sum_of_dtypes == sum_of_mapped_dtypes diff --git a/viadot/__init__.py b/viadot/__init__.py index 5635676f6..b5c9b6cb7 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.2.11" +__version__ = "0.2.12" diff --git a/viadot/exceptions.py b/viadot/exceptions.py new file mode 100644 index 000000000..1af061068 --- /dev/null +++ b/viadot/exceptions.py @@ -0,0 +1,6 @@ +class ValidationError(Exception): + pass + + +class APIError(Exception): + pass diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index b4886f02c..122016a08 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -7,4 +7,5 @@ from .supermetrics_to_adls import SupermetricsToADLS from .supermetrics_to_azure_sql import SupermetricsToAzureSQL from .adls_container_to_container import ADLSContainerToContainer -from .cloud_for_customers_to_adls import CloudForCustomersToADLS +from .sharepoint_to_adls import SharepointToADLS +from .cloud_for_customers_report_to_adls import CloudForCustomersReportToADLS diff --git a/viadot/flows/cloud_for_customers_report_to_adls.py b/viadot/flows/cloud_for_customers_report_to_adls.py new file mode 100644 index 000000000..bab14e8e0 --- /dev/null +++ b/viadot/flows/cloud_for_customers_report_to_adls.py @@ -0,0 +1,212 @@ +import os +from typing import Any, Dict, List, Union + +import pendulum +from prefect import Flow, Task, apply_map +from prefect.backend import set_key_value + +from ..task_utils import ( + add_ingestion_metadata_task, + df_to_csv, + df_to_parquet, + union_dfs_task, +) +from ..tasks import AzureDataLakeUpload, c4c_report_to_df, c4c_to_df + +file_to_adls_task = AzureDataLakeUpload() + + +class CloudForCustomersReportToADLS(Flow): + def __init__( + self, + report_url: str = None, + url: str = None, + env: str = "QA", + endpoint: str = None, + params: Dict[str, Any] = {}, + fields: List[str] = None, + name: str = None, + adls_sp_credentials_secret: str = None, + local_file_path: str = None, + output_file_extension: str = ".csv", + adls_dir_path: str = None, + if_empty: str = "warn", + if_exists: str = "replace", + skip: int = 0, + top: int = 1000, + channels: List[str] = None, + months: List[str] = None, + years: List[str] = None, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Flow for downloading data from different marketing APIs to a local CSV + using Cloud for Customers API, then uploading it to Azure Data Lake. + + Args: + report_url (str, optional): The url to the API. Defaults to None. + name (str): The name of the flow. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + local_file_path (str, optional): Local destination path. Defaults to None. + output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy + to handle with parquet. Defaults to ".csv". + adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None. + if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn". + if_exists (str, optional): What to do if the table already exists. Defaults to "replace". + skip (int, optional): Initial index value of reading row. + top (int, optional): The value of top reading row. + channels (List[str], optional): Filtering parameters passed to the url. + months (List[str], optional): Filtering parameters passed to the url. + years (List[str], optional): Filtering parameters passed to the url. + """ + + self.if_empty = if_empty + self.report_url = report_url + self.env = env + self.skip = skip + self.top = top + # AzureDataLakeUpload + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.if_exists = if_exists + self.output_file_extension = output_file_extension + self.local_file_path = ( + local_file_path or self.slugify(name) + self.output_file_extension + ) + self.now = str(pendulum.now("utc")) + self.adls_dir_path = adls_dir_path + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + # in case of non-report invoking + self.url = url + self.endpoint = endpoint + self.params = params + self.fields = fields + # filtering for report_url for reports + self.channels = channels + self.months = months + self.years = years + + self.report_urls_with_filters = [self.report_url] + + self.report_urls_with_filters = self.create_url_with_fields( + fields_list=self.channels, filter_code="CCHANNETZTEXT12CE6C2FA0D77995" + ) + + self.report_urls_with_filters = self.create_url_with_fields( + fields_list=self.months, filter_code="CMONTH_ID" + ) + + self.report_urls_with_filters = self.create_url_with_fields( + fields_list=self.years, filter_code="CYEAR_ID" + ) + + super().__init__(*args, name=name, **kwargs) + + self.gen_flow() + + def create_url_with_fields(self, fields_list: List[str], filter_code: str) -> List: + urls_list_result = [] + add_filter = True + if len(self.report_urls_with_filters) > 1: + add_filter = False + + if fields_list: + for url in self.report_urls_with_filters: + for field in fields_list: + if add_filter: + new_url = f"{url}&$filter=({filter_code}%20eq%20%27{field}%27)" + elif not add_filter: + new_url = f"{url}%20and%20({filter_code}%20eq%20%27{field}%27)" + urls_list_result.append(new_url) + return urls_list_result + else: + return self.report_urls_with_filters + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() + + def gen_c4c( + self, + url: str, + report_url: str, + endpoint: str, + params: str, + env: str, + flow: Flow = None, + ) -> Task: + + df = c4c_to_df.bind( + url=url, + env=env, + endpoint=endpoint, + params=params, + report_url=report_url, + flow=flow, + ) + + return df + + def gen_c4c_report_months( + self, report_urls_with_filters: Union[str, List[str]], flow: Flow = None + ) -> Task: + + report = c4c_report_to_df.bind( + skip=self.skip, + top=self.top, + report_url=report_urls_with_filters, + env=self.env, + flow=flow, + ) + + return report + + def gen_flow(self) -> Flow: + if self.report_url: + dfs = apply_map( + self.gen_c4c_report_months, self.report_urls_with_filters, flow=self + ) + df = union_dfs_task.bind(dfs, flow=self) + elif self.url: + df = self.gen_c4c( + url=self.url, + report_url=self.report_url, + env=self.env, + endpoint=self.endpoint, + params=self.params, + flow=self, + ) + + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + + if self.output_file_extension == ".parquet": + df_to_file = df_to_parquet.bind( + df=df_with_metadata, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + else: + df_to_file = df_to_csv.bind( + df=df_with_metadata, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + + file_to_adls_task.bind( + from_path=self.local_file_path, + to_path=self.adls_file_path, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + df_with_metadata.set_upstream(df, flow=self) + df_to_file.set_upstream(df_with_metadata, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) + + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/cloud_for_customers_to_adls.py b/viadot/flows/cloud_for_customers_to_adls.py deleted file mode 100644 index ec9e74f7c..000000000 --- a/viadot/flows/cloud_for_customers_to_adls.py +++ /dev/null @@ -1,136 +0,0 @@ -import json -import os -import shutil -from pathlib import Path -from typing import Any, Dict, List, Union - -import pandas as pd -import pendulum -import prefect -from prefect import Flow, Task, apply_map, task -from prefect.backend import set_key_value -from prefect.tasks.secrets import PrefectSecret -from prefect.utilities import logging -from visions.functional import infer_type -from visions.typesets.complete_set import CompleteSet - -from ..task_utils import add_ingestion_metadata_task -from ..tasks import ( - AzureDataLakeUpload, - CloudForCustomersToDF, -) - -logger = logging.get_logger(__name__) - -cloud_for_customers_to_df_task = CloudForCustomersToDF() -file_to_adls_task = AzureDataLakeUpload() - - -@task -def df_to_csv_task(df, path: str, if_exists: str = "replace"): - if if_exists == "append": - if os.path.isfile(path): - csv_df = pd.read_csv(path) - out_df = pd.concat([csv_df, df]) - else: - out_df = df - elif if_exists == "replace": - out_df = df - out_df.to_csv(path, index=False) - - -@task -def df_to_parquet_task(df, path: str, if_exists: str = "replace"): - if if_exists == "append": - if os.path.isfile(path): - parquet_df = pd.read_parquet(path) - out_df = pd.concat([parquet_df, df]) - else: - out_df = df - elif if_exists == "replace": - out_df = df - out_df.to_parquet(path, index=False) - - -class CloudForCustomersToADLS(Flow): - def __init__( - self, - url: str = None, - endpoint: str = None, - name: str = None, - params: Dict[str, Any] = {}, - adls_sp_credentials_secret: str = None, - fields: List[str] = None, - local_file_path: str = None, - output_file_extension: str = ".csv", - adls_dir_path: str = None, - if_empty: str = "warn", - if_exists: str = "replace", - *args: List[any], - **kwargs: Dict[str, Any], - ): - - # CloudForCustomersToDF - self.if_empty = if_empty - self.url = url - self.endpoint = endpoint - self.fields = fields - self.params = params - - # AzureDataLakeUpload - self.adls_sp_credentials_secret = adls_sp_credentials_secret - self.if_exists = if_exists - self.output_file_extension = output_file_extension - self.local_file_path = ( - local_file_path or self.slugify(name) + self.output_file_extension - ) - self.now = str(pendulum.now("utc")) - self.adls_dir_path = adls_dir_path - self.adls_file_path = os.path.join( - adls_dir_path, self.now + self.output_file_extension - ) - - super().__init__(*args, name=name, **kwargs) - - self.gen_flow() - - def gen_flow(self) -> Flow: - - df = cloud_for_customers_to_df_task.bind( - url=self.url, - endpoint=self.endpoint, - fields=self.fields, - params=self.params, - flow=self, - ) - - df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) - - if self.output_file_extension == ".parquet": - df_to_file = df_to_parquet_task.bind( - df=df_with_metadata, - path=self.local_file_path, - if_exists=self.if_exists, - flow=self, - ) - else: - df_to_file = df_to_csv_task.bind( - df=df_with_metadata, - path=self.local_file_path, - if_exists=self.if_exists, - flow=self, - ) - - file_to_adls_task.bind( - from_path=self.local_file_path, - to_path=self.adls_file_path, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) - - file_to_adls_task.set_upstream(df_to_file, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) - - @staticmethod - def slugify(name): - return name.replace(" ", "_").lower() diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py new file mode 100644 index 000000000..3a42cacb5 --- /dev/null +++ b/viadot/flows/sharepoint_to_adls.py @@ -0,0 +1,154 @@ +import os +from typing import Any, Dict, List +import pendulum +from prefect import Flow, task +from prefect.backend import set_key_value +from prefect.utilities import logging + +logger = logging.get_logger() + +from ..task_utils import ( + df_get_data_types_task, + add_ingestion_metadata_task, + df_to_csv, + df_to_parquet, + dtypes_to_json, + df_map_mixed_dtypes_for_parquet, +) +from ..tasks import AzureDataLakeUpload +from ..tasks.sharepoint import SharepointToDF + +excel_to_df_task = SharepointToDF() +file_to_adls_task = AzureDataLakeUpload() +json_to_adls_task = AzureDataLakeUpload() + + +class SharepointToADLS(Flow): + def __init__( + self, + name: str = None, + nrows_to_df: int = None, + path_to_file: str = None, + url_to_file: str = None, + sheet_number: int = None, + validate_excel_file: bool = False, + output_file_extension: str = ".csv", + local_dir_path: str = None, + adls_dir_path: str = None, + adls_sp_credentials_secret: str = None, + if_empty: str = "warn", + if_exists: str = "replace", + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Flow for downloading Excel file from Sharepoint then uploading it to Azure Data Lake. + + Args: + name (str, optional): The name of the flow. Defaults to None. + nrows_to_df (int, optional): Number of rows to read at a time. Defaults to 50000. Defaults to None. + path_to_file (str, optional): Path to local Excel file. Defaults to None. + url_to_file (str, optional): Link to a file on Sharepoint. + (e.g : https://{tenant_name}.sharepoint.com/sites/{folder}/Shared%20Documents/Dashboard/file). Defaults to None. + sheet_number (int, optional): Sheet number to be extracted from file. Counting from 0, if None all sheets are axtracted. Defaults to None. + validate_excel_file (bool, optional): Check if columns in separate sheets are the same. Defaults to False. + output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".csv". + local_dir_path (str, optional): File directory. Defaults to None. + adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + if_empty (str, optional): What to do if query returns no data. Defaults to "warn". + """ + # SharepointToDF + self.if_empty = if_empty + self.nrows = nrows_to_df + self.path_to_file = path_to_file + self.url_to_file = url_to_file + self.local_dir_path = local_dir_path + self.sheet_number = sheet_number + self.validate_excel_file = validate_excel_file + + # AzureDataLakeUpload + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.if_exists = if_exists + self.output_file_extension = output_file_extension + self.now = str(pendulum.now("utc")) + if self.local_dir_path is not None: + self.local_file_path = ( + self.local_dir_path + self.slugify(name) + self.output_file_extension + ) + else: + self.local_file_path = self.slugify(name) + self.output_file_extension + self.local_json_path = self.slugify(name) + ".json" + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + self.adls_dir_path = adls_dir_path + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + + super().__init__(*args, name=name, **kwargs) + + self.gen_flow() + + def gen_flow(self) -> Flow: + df = excel_to_df_task.bind( + path_to_file=self.path_to_file, + url_to_file=self.url_to_file, + nrows=self.nrows, + sheet_number=self.sheet_number, + validate_excel_file=self.validate_excel_file, + flow=self, + ) + + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) + if self.output_file_extension == ".parquet": + + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + else: + df_to_file = df_to_csv.bind( + df=df_with_metadata, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + + file_to_adls_task.bind( + from_path=self.local_file_path, + to_path=self.adls_file_path, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_to_json.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json, flow=self) + + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() diff --git a/viadot/flows/supermetrics_to_adls.py b/viadot/flows/supermetrics_to_adls.py index 794d258be..083e9fcdd 100644 --- a/viadot/flows/supermetrics_to_adls.py +++ b/viadot/flows/supermetrics_to_adls.py @@ -1,24 +1,23 @@ -import json import os -import shutil -from pathlib import Path from typing import Any, Dict, List, Union -import pandas as pd import pendulum -import prefect -from prefect import Flow, Task, apply_map, task +from prefect import Flow, Task, apply_map from prefect.backend import set_key_value from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging -from visions.functional import infer_type -from visions.typesets.complete_set import CompleteSet from ..task_utils import ( add_ingestion_metadata_task, + cleanup_validation_clutter, df_get_data_types_task, - df_mapp_mixed_dtypes_for_parquet, + df_map_mixed_dtypes_for_parquet, + df_to_csv, + df_to_parquet, + dtypes_to_json_task, + union_dfs_task, update_dtypes_dict, + write_to_json, ) from ..tasks import ( AzureDataLakeUpload, @@ -36,67 +35,6 @@ json_to_adls_task = AzureDataLakeUpload() -@task -def write_to_json(dict_, path): - - logger = prefect.context.get("logger") - - if os.path.isfile(path): - logger.warning(f"File {path} already exists. Overwriting...") - else: - logger.debug(f"Writing to {path}...") - - # create parent directories if they don't exist - Path(path).parent.mkdir(parents=True, exist_ok=True) - with open(path, mode="w") as f: - json.dump(dict_, f) - - logger.debug(f"Successfully wrote to {path}.") - - -@task -def union_dfs_task(dfs: List[pd.DataFrame]): - return pd.concat(dfs, ignore_index=True) - - -@task -def dtypes_to_json_task(dtypes_dict, local_json_path: str): - with open(local_json_path, "w") as fp: - json.dump(dtypes_dict, fp) - - -@task -def df_to_parquet_task(df, path: str, if_exists: str = "replace"): - if if_exists == "append": - if os.path.isfile(path): - parquet_df = pd.read_parquet(path) - out_df = pd.concat([parquet_df, df]) - else: - out_df = df - elif if_exists == "replace": - out_df = df - out_df.to_parquet(path, index=False) - - -@task -def df_to_csv_task(df, path: str, if_exists: str = "replace"): - if if_exists == "append": - if os.path.isfile(path): - csv_df = pd.read_csv(path) - out_df = pd.concat([csv_df, df]) - else: - out_df = df - elif if_exists == "replace": - out_df = df - out_df.to_csv(path, index=False) - - -@task -def cleanup_validation_clutter(expectations_path): - ge_project_path = Path(expectations_path).parent - shutil.rmtree(ge_project_path) - - class SupermetricsToADLS(Flow): def __init__( self, @@ -295,19 +233,19 @@ def gen_flow(self) -> Flow: df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) - df_to_be_loaded = df_mapp_mixed_dtypes_for_parquet( + df_to_be_loaded = df_map_mixed_dtypes_for_parquet( df_with_metadata, dtypes_dict, flow=self ) if self.output_file_extension == ".parquet": - df_to_file = df_to_parquet_task.bind( + df_to_file = df_to_parquet.bind( df=df_to_be_loaded, path=self.local_file_path, if_exists=self.if_exists, flow=self, ) else: - df_to_file = df_to_csv_task.bind( + df_to_file = df_to_csv.bind( df=df_with_metadata, path=self.local_file_path, if_exists=self.if_exists, diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 693962d13..177d13485 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -3,6 +3,8 @@ from .azure_sql import AzureSQL from .supermetrics import Supermetrics from .cloud_for_customers import CloudForCustomers +from .sharepoint import Sharepoint + # APIS from .uk_carbon_intensity import UKCarbonIntensity from .sqlite import SQLite diff --git a/viadot/sources/base.py b/viadot/sources/base.py index 9353a815a..a31574262 100644 --- a/viadot/sources/base.py +++ b/viadot/sources/base.py @@ -19,6 +19,7 @@ class Source: def __init__(self, *args, credentials: Dict[str, Any] = None, **kwargs): self.credentials = credentials self.data: pa.Table = None + self.logger = logger @abstractmethod def to_json(self): diff --git a/viadot/sources/cloud_for_customers.py b/viadot/sources/cloud_for_customers.py index 151f05001..5ad0d2a76 100644 --- a/viadot/sources/cloud_for_customers.py +++ b/viadot/sources/cloud_for_customers.py @@ -4,59 +4,171 @@ from typing import Any, Dict, List from urllib.parse import urljoin from ..config import local_config +from ..exceptions import APIError +from requests.adapters import HTTPAdapter +from requests.exceptions import ConnectionError, HTTPError, ReadTimeout, Timeout +from urllib3.exceptions import ProtocolError +import re +from requests.packages.urllib3.util.retry import Retry class CloudForCustomers(Source): - """ - Fetches data from Cloud for Customer. - - Args: - url (str, optional): The url to the API. Defaults to None. - endpoint (str, optional): The endpoint of the API. Defaults to None. - params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format. - """ - def __init__( self, *args, + report_url: str = None, url: str = None, endpoint: str = None, - params: Dict[str, Any] = {}, - **kwargs + params: Dict[str, Any] = None, + env: str = "QA", + credentials: Dict[str, Any] = None, + **kwargs, ): + """ + Fetches data from Cloud for Customer. + + Args: + report_url (str, optional): The url to the API in case of prepared report. Defaults to None. + url (str, optional): The url to the API. Defaults to None. + endpoint (str, optional): The endpoint of the API. Defaults to None. + params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format. + env (str, optional): The development environments. Defaults to 'QA'. + """ super().__init__(*args, **kwargs) - credentials = local_config.get("CLOUD_FOR_CUSTOMERS") - self.api_url = url or credentials["server"] + + DEFAULT_CREDENTIALS = local_config["CLOUD_FOR_CUSTOMERS"].get(env) + self.credentials = credentials or DEFAULT_CREDENTIALS + self.url = url or self.credentials.get("server") + self.report_url = report_url + self.is_report = bool(report_url) self.query_endpoint = endpoint - self.params = params + self.params = params or {} self.params["$format"] = "json" - self.auth = (credentials["username"], credentials["password"]) - def to_records(self) -> List: - try: - response = requests.get( - urljoin(self.api_url, self.query_endpoint), - params=self.params, - auth=self.auth, - ) - response.raise_for_status() + if self.credentials: + self.auth = (self.credentials["username"], self.credentials["password"]) + else: + self.auth = (None, None) - except Exception as e: - raise + if self.url: + self.full_url = urljoin(self.url, self.query_endpoint) + + @staticmethod + def change_to_meta_url(url: str) -> str: + start = url.split(".svc")[0] + url_raw = url.split("?")[0] + end = url_raw.split("/")[-1] + meta_url = start + ".svc/$metadata?entityset=" + end + return meta_url + + def _to_records_report(self) -> List[Dict[str, Any]]: + url = self.report_url + records = [] + while url: + response = self.get_response(url) + response_json = response.json() + new_records = self.response_to_entity_list(response_json, url) + records.extend(new_records) + + url = response_json["d"].get("__next") + + return records - dirty_json = response.json() + def _to_records_other(self) -> List[Dict[str, Any]]: + url = self.full_url + records = [] + while url: + response = self.get_response(self.full_url, params=self.params) + response_json = response.json() + if isinstance(response_json["d"], dict): + # ODATA v2+ API + new_records = response_json["d"].get("results") + url = response_json["d"].get("__next") + else: + # ODATA v1 + new_records = response_json["d"] + url = response_json.get("__next") + + records.extend(new_records) + + return records + + def to_records(self) -> List[Dict[str, Any]]: + """Download a list of entities in the records format""" + if self.is_report: + return self._to_records_report() + else: + return self._to_records_other() + + def response_to_entity_list(self, dirty_json: Dict[str, Any], url: str) -> List: + metadata_url = self.change_to_meta_url(url) + column_maper_dict = self.map_columns(metadata_url) entity_list = [] for element in dirty_json["d"]["results"]: new_entity = {} for key, object_of_interest in element.items(): - if key != "__metadata" and key != "Photo" and key != "": - # skip the column which contain nested structure + if key not in ["__metadata", "Photo", "", "Picture"]: if "{" not in str(object_of_interest): - new_entity[key] = object_of_interest + new_key = column_maper_dict.get(key) + if new_key: + new_entity[new_key] = object_of_interest + else: + new_entity[key] = object_of_interest entity_list.append(new_entity) return entity_list - def to_df(self, fields: List[str] = None, if_empty: str = None) -> pd.DataFrame: + def map_columns(self, url: str = None) -> Dict[str, str]: + column_mapping = {} + if url: + auth = (self.credentials["username"], self.credentials["password"]) + response = requests.get(url, auth=auth) + for sentence in response.text.split("/>"): + result = re.search( + r'(?<=Name=")([^"]+).+(sap:label=")([^"]+)+', sentence + ) + if result: + key = result.groups(0)[0] + val = result.groups(0)[2] + column_mapping[key] = val + return column_mapping + + def get_response(self, url: str, params: Dict[str, Any] = None): + try: + session = requests.Session() + retry_strategy = Retry( + total=3, + status_forcelist=[429, 500, 502, 503, 504], + backoff_factor=1, + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + + session.mount("http://", adapter) + session.mount("https://", adapter) + response = session.get(url, params=params, auth=self.auth) + response.raise_for_status() + # TODO: abstract below and put as handle_api_response() into utils.py + except ReadTimeout as e: + msg = "The connection was successful, " + msg += f"however the API call to {url} timed out after .... s " + msg += "while waiting for the server to return data." + raise APIError(msg) + except HTTPError as e: + raise APIError( + f"The API call to {url} failed. " + "Perhaps your account credentials need to be refreshed?", + ) from e + except (ConnectionError, Timeout) as e: + raise APIError( + f"The API call to {url} failed due to connection issues." + ) from e + except ProtocolError as e: + raise APIError(f"Did not receive any reponse for the API call to {url}.") + except Exception as e: + raise APIError("Unknown error.") from e + + return response + + def to_df(self, fields: List[str] = None, if_empty: str = "warn") -> pd.DataFrame: records = self.to_records() df = pd.DataFrame(data=records) if fields: diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py new file mode 100644 index 000000000..349009b4d --- /dev/null +++ b/viadot/sources/sharepoint.py @@ -0,0 +1,55 @@ +from typing import Dict, Any +import sharepy + +from .base import Source +from ..config import local_config + + +class Sharepoint(Source): + """ + A Sharepoint class to connect and download specific Excel file from Sharepoint. + + Args: + credentials (dict): In credentials should be included: + "site" - Path to sharepoint website (e.g : {tenant_name}.sharepoint.com) + "username" - Sharepoint username (e.g username@{tenant_name}.com) + "password" + download_from_path (str, optional): Full url to file + (e.g : https://{tenant_name}.sharepoint.com/sites/{directory}/Shared%20Documents/Dashboard/file). Defaults to None. + """ + + def __init__( + self, + credentials: Dict[str, Any] = None, + download_from_path: str = None, + *args, + **kwargs, + ): + + DEFAULT_CREDENTIALS = local_config.get("SHAREPOINT") + credentials = credentials or DEFAULT_CREDENTIALS + self.download_from_path = download_from_path or DEFAULT_CREDENTIALS["file_url"] + self.required_credentials = ["site", "username", "password"] + super().__init__(*args, credentials=credentials, **kwargs) + + def get_connection(self) -> sharepy.session.SharePointSession: + if any([rq not in self.credentials for rq in self.required_credentials]): + raise ValueError("Missing credentials.") + + return sharepy.connect( + site=self.credentials["site"], + username=self.credentials["username"], + password=self.credentials["password"], + ) + + def download_file( + self, + download_from_path: str = None, + download_to_path: str = "Sharepoint_file.xlsm", + ) -> None: + + conn = self.get_connection() + conn.getfile( + url=download_from_path or self.download_from_path, + filename=download_to_path, + ) diff --git a/viadot/sources/supermetrics.py b/viadot/sources/supermetrics.py index 83863383e..a8106425b 100644 --- a/viadot/sources/supermetrics.py +++ b/viadot/sources/supermetrics.py @@ -13,15 +13,12 @@ from urllib3.exceptions import ProtocolError from ..config import local_config +from ..exceptions import APIError from .base import Source logger = logging.get_logger(__name__) -class APIError(Exception): - pass - - class Supermetrics(Source): """ A class implementing the Supermetrics API. diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 6e61d924c..3a5058a6f 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -1,17 +1,22 @@ +import json import os from datetime import datetime, timezone -from typing import List +from typing import List, Literal import pandas as pd +from pathlib import Path +import shutil +import json -import visions from visions.functional import infer_type from visions.typesets.complete_set import CompleteSet import prefect from prefect import task from prefect.storage import Git +from prefect.utilities import logging +logger = logging.get_logger() METADATA_COLUMNS = {"_viadot_downloaded_at_utc": "DATETIME"} @@ -52,6 +57,12 @@ def get_latest_timestamp_file_path(files: List[str]) -> str: return latest_file +@task +def dtypes_to_json_task(dtypes_dict, local_json_path: str): + with open(local_json_path, "w") as fp: + json.dump(dtypes_dict, fp) + + @task def chunk_df(df: pd.DataFrame, size: int = 10_000) -> List[pd.DataFrame]: n_rows = df.shape[0] @@ -68,7 +79,9 @@ def df_get_data_types_task(df: pd.DataFrame) -> dict: @task -def df_mapp_mixed_dtypes_for_parquet(df, dtypes_dict) -> pd.DataFrame: +def df_map_mixed_dtypes_for_parquet( + df: pd.DataFrame, dtypes_dict: dict +) -> pd.DataFrame: """ Pandas is not able to handle mixed dtypes in the column in to_parquet Mapping 'object' visions dtype to 'string' dtype to allow Pandas to_parquet @@ -80,7 +93,6 @@ def df_mapp_mixed_dtypes_for_parquet(df, dtypes_dict) -> pd.DataFrame: Returns: df_mapped (pd.DataFrame): Pandas DataFrame with mapped Data Types to workaround Pandas to_parquet bug connected with mixed dtypes in object:. """ - df_mapped = df.copy() for col, dtype in dtypes_dict.items(): if dtype == "Object": @@ -89,7 +101,7 @@ def df_mapp_mixed_dtypes_for_parquet(df, dtypes_dict) -> pd.DataFrame: @task -def update_dtypes_dict(dtypes_dict): +def update_dtypes_dict(dtypes_dict: dict) -> dict: """ Task to update dtypes_dictionary that will be stored in the schema. It's required due to workaround Pandas to_parquet bug connected with mixed dtypes in object @@ -106,6 +118,82 @@ def update_dtypes_dict(dtypes_dict): return dtypes_dict_updated +@task +def df_to_csv( + df: pd.DataFrame, + path: str, + sep="\t", + if_exists: Literal["append", "replace", "skip"] = "replace", + **kwargs, +) -> None: + if if_exists == "append" and os.path.isfile(path): + csv_df = pd.read_csv(path) + out_df = pd.concat([csv_df, df]) + elif if_exists == "replace": + out_df = df + elif if_exists == "skip": + logger.info("Skipped.") + return + else: + out_df = df + out_df.to_csv(path, index=False, sep=sep) + + +@task +def df_to_parquet( + df: pd.DataFrame, + path: str, + if_exists: Literal["append", "replace", "skip"] = "replace", + **kwargs, +) -> None: + if if_exists == "append" and os.path.isfile(path): + parquet_df = pd.read_parquet(path) + out_df = pd.concat([parquet_df, df]) + elif if_exists == "replace": + out_df = df + elif if_exists == "skip": + logger.info("Skipped.") + return + else: + out_df = df + out_df.to_parquet(path, index=False, **kwargs) + + +@task +def dtypes_to_json(dtypes_dict: dict, local_json_path: str) -> None: + with open(local_json_path, "w") as fp: + json.dump(dtypes_dict, fp) + + +@task +def union_dfs_task(dfs: List[pd.DataFrame]): + return pd.concat(dfs, ignore_index=True) + + +@task +def write_to_json(dict_, path): + + logger = prefect.context.get("logger") + + if os.path.isfile(path): + logger.warning(f"File {path} already exists. Overwriting...") + else: + logger.debug(f"Writing to {path}...") + + # create parent directories if they don't exist + Path(path).parent.mkdir(parents=True, exist_ok=True) + with open(path, mode="w") as f: + json.dump(dict_, f) + + logger.debug(f"Successfully wrote to {path}.") + + +@task +def cleanup_validation_clutter(expectations_path): + ge_project_path = Path(expectations_path).parent + shutil.rmtree(ge_project_path) + + class Git(Git): @property def git_clone_url(self): diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index f72d4281d..1b8658ee9 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -22,4 +22,5 @@ from .great_expectations import RunGreatExpectationsValidation from .sqlite import SQLiteInsert, SQLiteSQLtoDF, SQLiteQuery from .supermetrics import SupermetricsToCSV, SupermetricsToDF -from .cloud_for_customers import CloudForCustomersToCSV, CloudForCustomersToDF +from .sharepoint import SharepointToDF +from .cloud_for_customers import c4c_report_to_df, c4c_to_df diff --git a/viadot/tasks/cloud_for_customers.py b/viadot/tasks/cloud_for_customers.py index f22ff015a..47b34a367 100644 --- a/viadot/tasks/cloud_for_customers.py +++ b/viadot/tasks/cloud_for_customers.py @@ -1,127 +1,46 @@ -from datetime import timedelta -from typing import Any, Dict, List, Union - +from prefect import task import pandas as pd -from prefect import Task -from prefect.utilities.tasks import defaults_from_attrs - from ..sources import CloudForCustomers - - -class CloudForCustomersToCSV(Task): - """ - Task for downloading data from the Cloud For Customers to a csv file. - - Args: - if_exists (str, optional): What to do if the table already exists. - if_empty (str, optional): What to do if query returns no data. Defaults to "warn". - sep: The separator used in csv file by default '\t', - """ - - def __init__( - self, - *args, - if_exists: str = "replace", - if_empty: str = "warn", - sep="\t", - **kwargs, - ): - - self.if_exists = if_exists - self.if_empty = if_empty - self.sep = sep - - super().__init__( - name="cloud_for_customers_to_csv", - *args, - **kwargs, - ) - - def __call__(self): - """Download Cloud For Customers data to a CSV""" - super().__call__(self) - - def run( - self, - path: str = "cloud_for_customers_extract.csv", - url: str = None, - endpoint: str = None, - fields: List[str] = None, - params: Dict[str, Any] = {}, - ): - """ - Run Task CloudForCustomersToCSV. - - Args: - path (str, optional): The path to the output file. By default in current dir with filename "cloud_for_customers_extract.csv". - url (str, optional): The url to the API. Defaults value from credential.json file. - endpoint (str, optional): The endpoint of the API. Defaults to None. - fields (List, optional): The appropriate columns of interest, by default the task returns each column. - params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format. - """ - cloud_for_customers = CloudForCustomers( - url=url, endpoint=endpoint, params=params - ) - - # Download data to a local CSV file - self.logger.info(f"Downloading data to {path}...") - cloud_for_customers.to_csv( - path=path, - if_exists=self.if_exists, - if_empty=self.if_empty, - sep=self.sep, - fields=fields, - ) - self.logger.info(f"Successfully downloaded data to {path}.") - - -class CloudForCustomersToDF(Task): - """ - Task for downloading data from the Cloud For Customers to a pandas DataFrame. - - Args: - if_empty (str, optional): What to do if query returns no data. Defaults to "warn". - """ - - def __init__( - self, - *args, - if_empty: str = "warn", - **kwargs, - ): - - self.if_empty = if_empty - - super().__init__( - name="cloud_for_customers_to_df", - *args, - **kwargs, - ) - - def __call__(self): - """Download Cloud For Customers data to a DF""" - super().__call__(self) - - def run( - self, - url: str = None, - endpoint: str = None, - fields: List[str] = None, - params: Dict[str, Any] = {}, - ): - """ - Run Task CloudForCustomersToDF. - - Args: - url (str, optional): The url to the API. Defaults value from credential.json file. - endpoint (str, optional): The endpoint of the API. Defaults to None. - fields (List, optional): The appropriate columns of interest, by default the task returns each column. - params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format. - """ - cloud_for_customers = CloudForCustomers( - url=url, endpoint=endpoint, params=params - ) - - df = cloud_for_customers.to_df(if_empty=self.if_empty, fields=fields) - self.logger.info(f"Successfully downloaded data to a DataFrame.") - return df +from typing import Any, Dict, List + + +@task +def c4c_report_to_df(report_url: str, env: str = "QA", skip: int = 0, top: int = 1000): + final_df = pd.DataFrame() + next_batch = True + while next_batch: + new_url = f"{report_url}&$top={top}&$skip={skip}" + chunk_from_url = CloudForCustomers(report_url=new_url, env=env) + df = chunk_from_url.to_df() + final_df = final_df.append(df) + if not final_df.empty: + df_count = df.count()[1] + if df_count != top: + next_batch = False + skip += top + else: + break + return final_df + + +@task +def c4c_to_df( + url: str = None, + endpoint: str = None, + report_url: str = None, + fields: List[str] = None, + params: Dict[str, Any] = {}, + env: str = "QA", + if_empty: str = "warn", +): + cloud_for_customers = CloudForCustomers( + url=url, + report_url=report_url, + endpoint=endpoint, + params=params, + env=env, + ) + + df = cloud_for_customers.to_df(if_empty=if_empty, fields=fields) + + return df diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py new file mode 100644 index 000000000..9ae3b2d82 --- /dev/null +++ b/viadot/tasks/sharepoint.py @@ -0,0 +1,195 @@ +from typing import List +import os +import copy +import pandas as pd +from prefect import Task +from prefect.utilities.tasks import defaults_from_attrs +from prefect.utilities import logging + +from ..exceptions import ValidationError +from ..sources import Sharepoint + +logger = logging.get_logger() + + +class SharepointToDF(Task): + """ + Task for converting data from Sharepoint excel file to a pandas DataFrame. + + Args: + path_to_file (str): Path to Excel file. + url_to_file (str): Link to a file on Sharepoint. + (e.g : https://{tenant_name}.sharepoint.com/sites/{folder}/Shared%20Documents/Dashboard/file). Defaults to None. + nrows (int, optional): Number of rows to read at a time. Defaults to 50000. + sheet_number (int): Sheet number to be extracted from file. Counting from 0, if None all sheets are axtracted. Defaults to None. + validate_excel_file (bool, optional): Check if columns in separate sheets are the same. Defaults to False. + if_empty (str, optional): What to do if query returns no data. Defaults to "warn". + + Returns: + pd.DataFrame: Pandas data frame + """ + + def __init__( + self, + path_to_file: str = None, + url_to_file: str = None, + nrows: int = 50000, + sheet_number: int = None, + validate_excel_file: bool = False, + if_empty: str = "warn", + *args, + **kwargs, + ): + + self.if_empty = if_empty + self.path_to_file = path_to_file + self.url_to_file = url_to_file + self.nrows = nrows + self.sheet_number = sheet_number + self.validate_excel_file = validate_excel_file + + super().__init__( + name="sharepoint_to_df", + *args, + **kwargs, + ) + + def __call__(self): + """Download Sharepoint data to a DF""" + super().__call__(self) + + def check_column_names( + self, df_header: List[str] = None, header_to_compare: List[str] = None + ) -> List[str]: + """ + Check if column names in sheets are the same. + + Args: + df_header (List[str]): Header of df from excel sheet. + header_to_compare (List[str]): Header of df from previous excel sheet. + + Returns: + list: list of columns + """ + df_header_list = df_header.columns.tolist() + if header_to_compare is not None: + if df_header_list != header_to_compare: + raise ValidationError("Columns in sheets are different") + + return df_header_list + + def df_replace_special_chars(self, df: pd.DataFrame): + return df.replace(r"\n|\t", "", regex=True) + + def split_sheet( + self, + sheetname: str = None, + nrows: int = None, + chunks: List[pd.DataFrame] = None, + **kwargs, + ) -> List[pd.DataFrame]: + """ + Split sheet by chunks. + + Args: + sheetname (str): The sheet on which we iterate. + nrows (int): Number of rows to read at a time. + chunks(List[pd.DataFrame]): List of data in chunks. + + Returns: + List[pd.DataFrame]: List of data frames + """ + skiprows = 1 + logger.info(f"Worksheet: {sheetname}") + temp_chunks = copy.deepcopy(chunks) + i_chunk = 0 + while True: + df_chunk = pd.read_excel( + self.path_to_file, + sheet_name=sheetname, + nrows=nrows, + skiprows=skiprows, + header=None, + **kwargs, + ) + skiprows += nrows + # When there is no data, we know we can break out of the loop. + if df_chunk.empty: + break + else: + logger.debug(f" - chunk {i_chunk+1} ({df_chunk.shape[0]} rows)") + df_chunk["sheet_name"] = sheetname + temp_chunks.append(df_chunk) + i_chunk += 1 + return temp_chunks + + @defaults_from_attrs( + "path_to_file", + "url_to_file", + "nrows", + "sheet_number", + "validate_excel_file", + ) + def run( + self, + path_to_file: str = None, + url_to_file: str = None, + nrows: int = 50000, + validate_excel_file: bool = False, + sheet_number: int = None, + **kwargs, + ) -> None: + """ + Run Task ExcelToDF. + + Args: + path_to_file (str): Path to Excel file. Defaults to None. + url_to_file (str): Link to a file on Sharepoint. Defaults to None. + nrows (int, optional): Number of rows to read at a time. Defaults to 50000. + sheet_number (int): Sheet number to be extracted from file. Counting from 0, if None all sheets are axtracted. Defaults to None. + validate_excel_file (bool, optional): Check if columns in separate sheets are the same. Defaults to False. + + Returns: + pd.DataFrame: Pandas data frame + """ + self.path_to_file = path_to_file + self.url_to_file = url_to_file + path_to_file = os.path.basename(self.path_to_file) + self.sheet_number = sheet_number + + s = Sharepoint(download_from_path=self.url_to_file) + s.download_file(download_to_path=path_to_file) + + self.nrows = nrows + excel = pd.ExcelFile(self.path_to_file) + + if self.sheet_number is not None: + sheet_names_list = [excel.sheet_names[self.sheet_number]] + else: + sheet_names_list = excel.sheet_names + + header_to_compare = None + chunks = [] + + for sheetname in sheet_names_list: + df_header = pd.read_excel(self.path_to_file, sheet_name=sheetname, nrows=0) + + if validate_excel_file: + header_to_compare = self.check_column_names( + df_header, header_to_compare + ) + + chunks = self.split_sheet(sheetname, self.nrows, chunks) + df_chunks = pd.concat(chunks) + + # Rename the columns to concatenate the chunks with the header. + columns = {i: col for i, col in enumerate(df_header.columns.tolist())} + last_column = len(columns) + columns[last_column] = "sheet_name" + + df_chunks.rename(columns=columns, inplace=True) + df = pd.concat([df_header, df_chunks]) + + df = self.df_replace_special_chars(df) + self.logger.info(f"Successfully converted data to a DataFrame.") + return df