From 0c43153a48c95092d972f41c8af73a6e607a2e04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Ziemianek?= <49795849+Rafalz13@users.noreply.github.com> Date: Tue, 6 Aug 2024 15:39:58 +0200 Subject: [PATCH] Sharepoint orchestration code refactor (#950) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ Moved sharepoint tasks from prefect_viadot repo * ✨ Moved sharepoint_to_redshift_spectrum flow from prefect_viadot repo * 🔥 Cleaned up init for prefect tasks * Added `viadot.orchestration.prefect` --- .../flows/sharepoint_to_redshift_spectrum.py | 238 +++--------------- .../orchestration/prefect/tasks/__init__.py | 11 +- .../orchestration/prefect/tasks/sharepoint.py | 144 ++--------- 3 files changed, 72 insertions(+), 321 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/sharepoint_to_redshift_spectrum.py b/src/viadot/orchestration/prefect/flows/sharepoint_to_redshift_spectrum.py index ddd2b9c6e..9685f76be 100644 --- a/src/viadot/orchestration/prefect/flows/sharepoint_to_redshift_spectrum.py +++ b/src/viadot/orchestration/prefect/flows/sharepoint_to_redshift_spectrum.py @@ -1,105 +1,15 @@ """Flows for downloading data from Sharepoint and uploading it to AWS Redshift Spectrum.""" # noqa: W505 -from pathlib import Path from typing import Any, Literal -import pandas as pd from prefect import flow from viadot.orchestration.prefect.tasks import ( df_to_redshift_spectrum, - get_endpoint_type_from_url, - scan_sharepoint_folder, sharepoint_to_df, - validate_and_reorder_dfs_columns, ) -def load_data_from_sharepoint( - file_sheet_mapping: dict | None, - download_all_files: bool, - sharepoint_url: str, - sheet_name: str | list[str | int] | int | None = None, - columns: str | list[str] | list[int] | None = None, - na_values: list[str] | None = None, - credentials_config: dict[str, Any] | None = None, -) -> dict: - """Loads data from SharePoint and returns it as a dictionary of DataFrames. - - This function fetches data from SharePoint based on the provided file-sheet - mapping or by downloading all files in a specified folder. It returns the - data as a dictionary where keys are filenames and values are the respective - DataFrames. - - Args: - file_sheet_mapping (dict): A dictionary where keys are filenames and values are - the sheet names to be loaded from each file. If provided, only these files - and sheets will be downloaded. - download_all_files (bool): A flag indicating whether to download all files from - the SharePoint folder specified by the `sharepoint_url`. This is ignored if - `file_sheet_mapping` is provided. - sharepoint_url (str): The base URL of the SharePoint site or folder. - sheet_name (str): The name of the sheet to load if `file_sheet_mapping` is not - provided. This is used when downloading all files. - columns (str | list[str] | list[int], optional): Which columns to ingest. - Defaults to None. - na_values (list[str] | None): Additional strings to recognize as NA/NaN. - If list passed, the specific NA values for each column will be recognized. - Defaults to None. - credentials_config (dict, optional): A dictionary containing credentials and - configuration for SharePoint. Defaults to None. - - Returns: - dict: A dictionary where keys are filenames and values are DataFrames containing - the data from the corresponding sheets. - - Options: - - If `file_sheet_mapping` is provided, only the specified files and sheets are - downloaded. - - If `download_all_files` is True, all files in the specified SharePoint folder - are downloaded and the data is loaded from the specified `sheet_name`. - """ - dataframes_dict = {} - credentials_secret = ( - credentials_config.get("secret") if credentials_config else None - ) - credentials = credentials_config.get("credentials") if credentials_config else None - config_key = credentials_config.get("config_key") if credentials_config else None - - if file_sheet_mapping: - for file, sheet in file_sheet_mapping.items(): - df = sharepoint_to_df( - url=sharepoint_url + file, - sheet_name=sheet, - columns=columns, - na_values=na_values, - credentials_secret=credentials_secret, - credentials=credentials, - config_key=config_key, - ) - dataframes_dict[file] = df - elif download_all_files: - list_of_urls = scan_sharepoint_folder( - url=sharepoint_url, - credentials_secret=credentials_secret, - credentials=credentials, - config_key=config_key, - ) - for file_url in list_of_urls: - df = sharepoint_to_df( - url=file_url, - sheet_name=sheet_name, - columns=columns, - na_values=na_values, - credentials_secret=credentials_secret, - credentials=credentials, - config_key=config_key, - ) - file_name = Path(file_url).stem + Path(file_url).suffix - dataframes_dict[file_name] = df - return dataframes_dict - - @flow( name="extract--sharepoint--redshift_spectrum", description="Extract data from Sharepoint and load it into AWS Redshift Spectrum.", @@ -127,14 +37,22 @@ def sharepoint_to_redshift_spectrum( # noqa: PLR0913, PLR0917 sharepoint_config_key: str | None = None, sharepoint_credentials: dict[str, Any] | None = None, file_sheet_mapping: dict | None = None, - download_all_files: bool = False, - return_as_one_table: bool = False, ) -> None: """Extract data from SharePoint and load it into AWS Redshift Spectrum. - This function downloads data from SharePoint and uploads it to AWS Redshift - Spectrum, either as a single table or as multiple tables depending on the provided - parameters. + This function downloads data either from SharePoint file or the whole directory and + uploads it to AWS Redshift Spectrum. + + Modes: + If the `URL` ends with the file (e.g ../file.xlsx) it downloads only the file and + creates a table from it. + If the `URL` ends with the folder (e.g ../folder_name/): it downloads multiple files + and creates a table from them: + - If `file_sheet_mapping` is provided, it downloads and processes only + the specified files and sheets. + - If `file_sheet_mapping` is NOT provided, it downloads and processes all of + the files from the chosen folder. + Args: sharepoint_url (str): The URL to the file. @@ -175,106 +93,32 @@ def sharepoint_to_redshift_spectrum( # noqa: PLR0913, PLR0917 relevant credentials. Defaults to None. sharepoint_credentials (dict, optional): Credentials to Sharepoint. Defaults to None. - file_sheet_mapping(dict, optional): Dictionary with mapping sheet for each file - that should be downloaded. If the parameter is provided only data from this - dictionary are downloaded. Defaults to empty dictionary. - download_all_files (bool, optional): Whether to download all files from - the folder. Defaults to False. - return_as_one_table (bool, optional): Whether to load data to a single table. - Defaults to False. - - The function operates in two main modes: - 1. If `file_sheet_mapping` is provided, it downloads and processes only - the specified files and sheets. - 2. If `download_all_files` is True, it scans the SharePoint folder for all files - and processes them. - - Additionally, depending on the value of `return_as_one_table`, the data is either - combined into a single table or uploaded as multiple tables. - + file_sheet_mapping (dict): A dictionary where keys are filenames and values are + the sheet names to be loaded from each file. If provided, only these files + and sheets will be downloaded. Defaults to None. """ - sharepoint_credentials_config = { - "secret": sharepoint_credentials_secret, - "credentials": sharepoint_credentials, - "config_key": sharepoint_config_key, - } - endpoint_type = get_endpoint_type_from_url(url=sharepoint_url) - - if endpoint_type == "file": - df = sharepoint_to_df( - url=sharepoint_url, - sheet_name=sheet_name, - columns=columns, - na_values=na_values, - credentials_secret=sharepoint_credentials_secret, - config_key=sharepoint_config_key, - credentials=sharepoint_credentials, - ) - - df_to_redshift_spectrum( - df=df, - to_path=to_path, - schema_name=schema_name, - table=table, - extension=extension, - if_exists=if_exists, - partition_cols=partition_cols, - index=index, - compression=compression, - sep=sep, - description=description, - credentials=aws_credentials, - config_key=aws_config_key, - ) - else: - dataframes_dict = load_data_from_sharepoint( - file_sheet_mapping=file_sheet_mapping, - download_all_files=download_all_files, - sharepoint_url=sharepoint_url, - sheet_name=sheet_name, - columns=columns, - na_values=na_values, - credentials_config=sharepoint_credentials_config, - ) - - if return_as_one_table is True: - dataframes_list = list(dataframes_dict.values()) - validated_and_reordered_dfs = validate_and_reorder_dfs_columns( - dataframes_list=dataframes_list - ) - final_df = pd.concat(validated_and_reordered_dfs, ignore_index=True) - - df_to_redshift_spectrum( - df=final_df, - to_path=to_path, - schema_name=schema_name, - table=table, - extension=extension, - if_exists=if_exists, - partition_cols=partition_cols, - index=index, - compression=compression, - sep=sep, - description=description, - credentials=aws_credentials, - config_key=aws_config_key, - ) - - elif return_as_one_table is False: - for file_name, df in dataframes_dict.items(): - file_name_without_extension = Path(file_name).stem - df_to_redshift_spectrum( - df=df, - to_path=f"{to_path}_{file_name_without_extension}", # to test - schema_name=schema_name, - table=f"{table}_{file_name_without_extension}", - extension=extension, - if_exists=if_exists, - partition_cols=partition_cols, - index=index, - compression=compression, - sep=sep, - description=description, - credentials=aws_credentials, - config_key=aws_config_key, - ) + df = sharepoint_to_df( + url=sharepoint_url, + sheet_name=sheet_name, + columns=columns, + na_values=na_values, + file_sheet_mapping=file_sheet_mapping, + credentials_secret=sharepoint_credentials_secret, + config_key=sharepoint_config_key, + credentials=sharepoint_credentials, + ) + df_to_redshift_spectrum( + df=df, + to_path=to_path, + schema_name=schema_name, + table=table, + extension=extension, + if_exists=if_exists, + partition_cols=partition_cols, + index=index, + compression=compression, + sep=sep, + description=description, + credentials=aws_credentials, + config_key=aws_config_key, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 1ab699655..fe6f928d6 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -17,12 +17,9 @@ from .redshift_spectrum import df_to_redshift_spectrum # noqa: F401 from .s3 import s3_upload_file # noqa: F401 from .sap_rfc import sap_rfc_to_df # noqa: F401 -from .sharepoint import get_endpoint_type_from_url # noqa: F401 -from .sharepoint import scan_sharepoint_folder # noqa: F401 -from .sharepoint import sharepoint_download_file # noqa: F401 -from .sharepoint import sharepoint_to_df # noqa: F401 -from .sharepoint import validate_and_reorder_dfs_columns # noqa: F401 +from .sharepoint import ( + sharepoint_download_file, # noqa: F401 + sharepoint_to_df, # noqa: F401 +) from .sql_server import (create_sql_server_table, sql_server_query, sql_server_to_df) - - diff --git a/src/viadot/orchestration/prefect/tasks/sharepoint.py b/src/viadot/orchestration/prefect/tasks/sharepoint.py index 75853844b..d06dd8806 100644 --- a/src/viadot/orchestration/prefect/tasks/sharepoint.py +++ b/src/viadot/orchestration/prefect/tasks/sharepoint.py @@ -1,7 +1,6 @@ """Tasks for interacting with Microsoft Sharepoint.""" from typing import Any -from urllib.parse import urlparse import pandas as pd from prefect import get_run_logger, task @@ -9,7 +8,9 @@ from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError from viadot.orchestration.prefect.utils import get_credentials from viadot.sources import Sharepoint -from viadot.sources.sharepoint import SharepointCredentials, get_last_segment_from_url +from viadot.sources.sharepoint import ( + SharepointCredentials, +) @task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60) @@ -18,6 +19,7 @@ def sharepoint_to_df( sheet_name: str | list[str | int] | int | None = None, columns: str | list[str] | list[int] | None = None, tests: dict[str, Any] | None = None, + file_sheet_mapping: dict | None = None, na_values: list[str] | None = None, credentials_secret: str | None = None, config_key: str | None = None, @@ -25,6 +27,16 @@ def sharepoint_to_df( ) -> pd.DataFrame: """Load an Excel file stored on Microsoft Sharepoint into a pandas `DataFrame`. + Modes: + If the `URL` ends with the file (e.g ../file.xlsx) it downloads only the file and + creates a DataFrame from it. + If the `URL` ends with the folder (e.g ../folder_name/): it downloads multiple files + and creates a DataFrame from them: + - If `file_sheet_mapping` is provided, it downloads and processes only + the specified files and sheets. + - If `file_sheet_mapping` is NOT provided, it downloads and processes all of + the files from the chosen folder. + Args: url (str): The URL to the file. sheet_name (str | list | int, optional): Strings are used @@ -34,12 +46,15 @@ def sharepoint_to_df( Defaults to None. columns (str | list[str] | list[int], optional): Which columns to ingest. Defaults to None. - na_values (list[str] | None): Additional strings to recognize as NA/NaN. - If list passed, the specific NA values for each column will be recognized. - Defaults to None. credentials_secret (str, optional): The name of the secret storing the credentials. Defaults to None. More info on: https://docs.prefect.io/concepts/blocks/ + file_sheet_mapping (dict): A dictionary where keys are filenames and values are + the sheet names to be loaded from each file. If provided, only these files + and sheets will be downloaded. Defaults to None. + na_values (list[str] | None): Additional strings to recognize as NA/NaN. + If list passed, the specific NA values for each column will be recognized. + Defaults to None. tests (dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate` function from viadot.utils. Defaults to None. @@ -61,14 +76,19 @@ def sharepoint_to_df( logger.info(f"Downloading data from {url}...") df = s.to_df( - url, sheet_name=sheet_name, tests=tests, usecols=columns, na_values=na_values + url, + sheet_name=sheet_name, + tests=tests, + usecols=columns, + na_values=na_values, + file_sheet_mapping=file_sheet_mapping, ) logger.info(f"Successfully downloaded data from {url}.") return df -@task +@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60) def sharepoint_download_file( url: str, to_path: str, @@ -98,113 +118,3 @@ def sharepoint_download_file( logger.info(f"Downloading data from {url}...") s.download_file(url=url, to_path=to_path) logger.info(f"Successfully downloaded data from {url}.") - - -@task -def scan_sharepoint_folder( - url: str, - credentials_secret: str | None = None, - credentials: SharepointCredentials | None = None, - config_key: str | None = None, -) -> list[str]: - """Scan Sharepoint folder to get all file URLs. - - Args: - url (str): The URL of the folder to scan. - credentials_secret (str, optional): The name of the secret that stores - Sharepoint credentials. Defaults to None. - credentials (SharepointCredentials, optional): Sharepoint credentials. - config_key (str, optional): The key in the viadot config holding relevant - credentials. - - Raises: - MissingSourceCredentialsError: Raise when no source credentials were provided. - ValueError: Raises when URL have the wrong structure - without 'sites' segment. - - Returns: - list[str]: List of URLS. - """ - if not (credentials_secret or config_key or credentials): - raise MissingSourceCredentialsError - - credentials = credentials or get_credentials(secret_name=credentials_secret) - s = Sharepoint(credentials=credentials, config_key=config_key) - conn = s.get_connection() - parsed_url = urlparse(url) - path_parts = parsed_url.path.split("/") - if "sites" in path_parts: - site_index = ( - path_parts.index("sites") + 2 - ) # +2 to include 'sites' and the next segment - site_url = f"{parsed_url.scheme}://{parsed_url.netloc}{'/'.join(path_parts[:site_index])}" - library = "/".join(path_parts[site_index:]) - else: - message = "URL does not contain '/sites/' segment." - raise ValueError(message) - - # -> site_url = company.sharepoint.com/sites/site_name/ - # -> library = /shared_documents/folder/sub_folder/final_folder - endpoint = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{library}')/Files" - response = conn.get(endpoint) - files = response.json().get("d", {}).get("results", []) - - return [f'{site_url}/{library}{file["Name"]}' for file in files] - - -@task -def validate_and_reorder_dfs_columns(dataframes_list: list[pd.DataFrame]) -> None: - """Validate if dataframes from the list have the same column structure. - - Reorder columns to match the first DataFrame if necessary. - - Args: - dataframes_list (list[pd.DataFrame]): List containing DataFrames. - - Raises: - IndexError: If the list of DataFrames is empty. - ValueError: If DataFrames have different column structures. - """ - logger = get_run_logger() - - if not dataframes_list: - message = "The list of dataframes is empty." - raise IndexError(message) - - first_df_columns = dataframes_list[0].columns - - # Check that all DataFrames have the same columns - for i, df in enumerate(dataframes_list): - if set(df.columns) != set(first_df_columns): - logger.info( - f"""Columns from first dataframe: {first_df_columns}\n - Columns from DataFrame at index {i}: {df.columns} """ - ) - message = f"""DataFrame at index {i} does not have the same structure as - the first DataFrame.""" - raise ValueError(message) - if not df.columns.equals(first_df_columns): - logger.info( - f"Reordering columns for DataFrame at index {i} to match the first DataFrame." - ) - dataframes_list[i] = df.loc[:, first_df_columns] - - logger.info("DataFrames have been validated and columns reordered where necessary.") - return dataframes_list - - -@task -def get_endpoint_type_from_url(url: str) -> str: - """Get the type of the last segment of the URL. - - This function uses `get_last_segment_from_url` to parse the provided URL and - determine whether the last segment represents a file or a directory. - - Args: - url (str): The URL to a SharePoint file or directory. - - Returns: - str: A string indicating the type of the last segment, either 'file' or - 'directory'. - """ - _, endpoint_type = get_last_segment_from_url(url) - return endpoint_type