Skip to content

Commit

Permalink
✨ Add SAPBW source and Prefect tasks (#1053)
Browse files Browse the repository at this point in the history
* ✨ created sap_bw source file.

* ✨ created sap_bw task file.

* ✨ created sap_bw flow file.

* ✅ created integration test file.

* 🎨 added sap bw to init files.

* 📝 updated some comment lines.

* adding ruff formatter

* Fix import

* 🐛 Fix imports

* 🐛 Fix imports

* 🎨 Adjust class name

* Fix imports

* 🎨 Format code

* remove credentials from flow and task

* 🎨 Make mdx_query required

* 🚚 move test file

* 📝 Update dostring

* Update docstring

---------

Co-authored-by: Diego-H-S <[email protected]>
Co-authored-by: angelika233 <[email protected]>
  • Loading branch information
3 people authored Sep 30, 2024
1 parent 105f461 commit 16cae13
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .mindful_to_adls import mindful_to_adls
from .outlook_to_adls import outlook_to_adls
from .salesforce_to_adls import salesforce_to_adls
from .sap_bw_to_adls import sap_bw_to_adls
from .sap_to_parquet import sap_to_parquet
from .sap_to_redshift_spectrum import sap_to_redshift_spectrum
from .sftp_to_adls import sftp_to_adls
Expand Down Expand Up @@ -47,6 +48,7 @@
"mindful_to_adls",
"outlook_to_adls",
"salesforce_to_adls",
"sap_bw_to_adls",
"sap_to_parquet",
"sap_to_redshift_spectrum",
"sftp_to_adls",
Expand Down
60 changes: 60 additions & 0 deletions src/viadot/orchestration/prefect/flows/sap_bw_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Task to download data from SAP BW API into a Pandas DataFrame."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, sap_bw_to_df


@flow(
name="SAP BW extraction to ADLS",
description="Extract data from SAP BW and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def sap_bw_to_adls(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mdx_query: str | None = None,
mapping_dict: dict[str, Any] | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
"""Flow for downloading data from SAP BW API to Azure Data Lake.
Args:
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mdx_query (str, optional): The MDX query to be passed to connection.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key.
Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
adls_path (str, optional): Azure Data Lake destination folder/catalog path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to False.
"""
data_frame = sap_bw_to_df(
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
mdx_query=mdx_query,
mapping_dict=mapping_dict,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
6 changes: 4 additions & 2 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .redshift_spectrum import df_to_redshift_spectrum
from .s3 import s3_upload_file
from .salesforce import salesforce_to_df
from .sap_bw import sap_bw_to_df
from .sap_rfc import sap_rfc_to_df
from .sftp import sftp_list, sftp_to_df
from .sharepoint import sharepoint_download_file, sharepoint_to_df
Expand All @@ -32,12 +33,12 @@
"bcp",
"clone_repo",
"cloud_for_customers_to_df",
"customer_gauge_to_df",
"df_to_databricks",
"create_sql_server_table",
"customer_gauge_to_df",
"dbt_task",
"df_to_adls",
"df_to_databricks",
"df_to_databricks",
"df_to_minio",
"df_to_redshift_spectrum",
"duckdb_query",
Expand All @@ -51,6 +52,7 @@
"outlook_to_df",
"s3_upload_file",
"salesforce_to_df",
"sap_bw_to_df",
"sap_rfc_to_df",
"sftp_list",
"sftp_to_df",
Expand Down
54 changes: 54 additions & 0 deletions src/viadot/orchestration/prefect/tasks/sap_bw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Task to download data from SAP BW into a Pandas DataFrame."""

import contextlib
from typing import Any

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials


with contextlib.suppress(ImportError):
from viadot.sources import SAPBW


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sap_bw_to_df(
mdx_query: str,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
mapping_dict: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""Task to download data from SAP BW to DataFrame.
Args:
mdx_query (str, required): The MDX query to be passed to connection.
config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
mapping_dict (dict[str, Any], optional): Dictionary with original and new
column names. Defaults to None.
Raises:
MissingSourceCredentialsError: If none credentials have been provided.
Returns:
pd.DataFrame: The response data as a Pandas Data Frame.
"""
if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

sap_bw = SAPBW(
credentials=credentials,
config_key=config_key,
)
sap_bw.api_connection(mdx_query=mdx_query)

return sap_bw.to_df(mapping_dict=mapping_dict)
6 changes: 4 additions & 2 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
"Hubspot",
"Mediatool",
"Mindful",
"Sftp",
"Outlook",
"SQLServer",
"Salesforce",
"Sftp",
"Sharepoint",
"Supermetrics",
"SupermetricsCredentials", # pragma: allowlist-secret
Expand Down Expand Up @@ -61,9 +61,11 @@

__all__.extend(["MinIO"])
if find_spec("pyrfc"):
from viadot.sources.sap_bw import SAPBW # noqa: F401
from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401

__all__.extend(["SAPRFC", "SAPRFCV2"])
__all__.extend(["SAPBW", "SAPRFC", "SAPRFCV2"])

if find_spec("pyspark"):
from viadot.sources.databricks import Databricks # noqa: F401

Expand Down
Loading

0 comments on commit 16cae13

Please sign in to comment.