Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add BusinessCore source and Prefect tasks #1051

Merged
merged 23 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

::: viadot.orchestration.prefect.flows.bigquery_to_adls

::: viadot.orchestration.prefect.flows.business_core_to_parquet

::: viadot.orchestration.prefect.flows.cloud_for_customers_to_adls

::: viadot.orchestration.prefect.flows.cloud_for_customers_to_databricks
Expand Down
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

::: viadot.orchestration.prefect.tasks.bigquery_to_df

::: viadot.orchestration.prefect.tasks.business_core.business_core_to_df

::: viadot.orchestration.prefect.tasks.cloud_for_customers_to_df

::: viadot.orchestration.prefect.tasks.create_sql_server_table
Expand Down
2 changes: 2 additions & 0 deletions docs/references/sources/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

::: viadot.sources.bigquery.BigQuery

::: viadot.sources.business_core.BusinessCore

::: viadot.sources.cloud_for_customers.CloudForCustomers

::: viadot.sources.customer_gauge.CustomerGauge
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .azure_sql_to_adls import azure_sql_to_adls
from .bigquery_to_adls import bigquery_to_adls
from .business_core_to_parquet import business_core_to_parquet
from .cloud_for_customers_to_adls import cloud_for_customers_to_adls
from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks
from .customer_gauge_to_adls import customer_gauge_to_adls
Expand Down Expand Up @@ -37,6 +38,7 @@
__all__ = [
"azure_sql_to_adls",
"bigquery_to_adls",
"business_core_to_parquet",
"cloud_for_customers_to_adls",
"cloud_for_customers_to_databricks",
"customer_gauge_to_adls",
Expand Down
61 changes: 61 additions & 0 deletions src/viadot/orchestration/prefect/flows/business_core_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Flow for downloading data from Business Core API to a Parquet file."""

from typing import Any, Literal

from prefect import flow

from viadot.orchestration.prefect.tasks.business_core import business_core_to_df
from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet


@flow(
name="extract--businesscore--parquet",
description="Extract data from Business Core API and load it into Parquet file",
retries=1,
retry_delay_seconds=60,
)
def business_core_to_parquet(
path: str | None = None,
url: str | None = None,
filters: dict[str, Any] | None = None,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
if_exists: Literal["append", "replace", "skip"] = "replace",
verify: bool = True,
) -> None:
"""Download data from Business Core API to a Parquet file.

Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API.
Defaults to None.
filters (dict[str, Any], optional): Filters in form of dictionary.
Available filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'.
Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business
Core credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional):
What to do if the table exists. Defaults to "replace".
verify (bool, optional): Whether or not verify certificates while
connecting to an API. Defaults to True.
"""
df = business_core_to_df(
url=url,
path=path,
credentials_secret=credentials_secret,
config_key=config_key,
filters=filters,
if_empty=if_empty,
verify=verify,
)
return df_to_parquet(
df=df,
path=path,
if_exists=if_exists,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .azure_sql import azure_sql_to_df
from .bcp import bcp
from .bigquery import bigquery_to_df
from .business_core import business_core_to_df
from .cloud_for_customers import cloud_for_customers_to_df
from .customer_gauge_to_df import customer_gauge_to_df
from .databricks import df_to_databricks
Expand Down Expand Up @@ -37,6 +38,7 @@
"bcp",
"clone_repo",
"bigquery_to_df",
"business_core_to_df",
"cloud_for_customers_to_df",
"create_sql_server_table",
"customer_gauge_to_df",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/tasks/business_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Task for downloading data from Business Core API to a Parquet file."""

from typing import Any

from pandas import DataFrame
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.business_core import BusinessCore


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def business_core_to_df(
path: str | None = None,
url: str | None = None,
filters: dict[str, Any] | None = None,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
verify: bool = True,
) -> DataFrame:
"""Download data from Business Core API to a Parquet file.

Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API. Defaults to
None.
filters (dict[str, Any], optional): Filters in form of dictionary. Available
filters: 'BucketCount','BucketNo', 'FromDate', 'ToDate'. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business
Core credentials. More info on: https://docs.prefect.io/concepts/blocks/.
Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to
"skip".
verify (bool, optional): Whether or not verify certificates while connecting
to an API. Defaults to True.
"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

logger = get_run_logger()

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)

bc = BusinessCore(
url=url,
path=path,
credentials=credentials,
config_key=config_key,
filters=filters,
verify=verify,
)

df = bc.to_df(if_empty=if_empty)

nrows = df.shape[0]
ncols = df.shape[1]

logger.info(
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame."
)

return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ._trino import Trino
from .azure_sql import AzureSQL
from .bigquery import BigQuery
from .business_core import BusinessCore
from .cloud_for_customers import CloudForCustomers
from .customer_gauge import CustomerGauge
from .epicor import Epicor
Expand All @@ -28,6 +29,7 @@
__all__ = [
"AzureSQL",
"BigQuery",
"BusinessCore",
"CloudForCustomers",
"CustomerGauge",
"DuckDB",
Expand Down
164 changes: 164 additions & 0 deletions src/viadot/sources/business_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Source for connecting to Business Core API."""

import json
from typing import Any, Literal

import pandas as pd
from pydantic import BaseModel, SecretStr

from viadot.config import get_source_credentials
from viadot.exceptions import APIError
from viadot.sources.base import Source
from viadot.utils import add_viadot_metadata_columns, handle_api_response


class BusinessCoreCredentials(BaseModel):
"""Business Core credentials.

Uses simple authentication:
- username: The user name to use.
- password: The password to use.
"""

username: str
password: SecretStr


class BusinessCore(Source):
"""Business Core ERP API connector."""

def __init__(
self,
url: str | None = None,
filters: dict[str, Any] | None = None,
credentials: dict[str, Any] | None = None,
config_key: str = "BusinessCore",
verify: bool = True,
*args,
**kwargs,
):
"""Create a BusinessCore connector instance.

Args:
url (str, optional): Base url to a view in Business Core API.
Defaults to None.
filters (dict[str, Any], optional): Filters in form of dictionary. Available
filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'. Defaults to
None.
credentials (dict[str, Any], optional): Credentials stored in a dictionary.
Required credentials: username, password. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to "BusinessCore".
verify (bool, optional): Whether or not verify certificates while connecting
to an API. Defaults to True.
"""
raw_creds = credentials or get_source_credentials(config_key)
validated_creds = dict(BusinessCoreCredentials(**raw_creds))

self.url = url
self.filters = self._clean_filters(filters)
self.verify = verify

super().__init__(*args, credentials=validated_creds, **kwargs)

def generate_token(self) -> str:
"""Generate a token for the user.

Returns:
string: The token.
"""
url = "https://api.businesscore.ae/api/user/Login"

username = self.credentials.get("username")
password = self.credentials.get("password").get_secret_value()
payload = f"grant_type=password&username={username}&password={password}&scope="
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = handle_api_response(
url=url,
headers=headers,
method="GET",
data=payload,
verify=self.verify,
)

return json.loads(response.text).get("access_token")

@staticmethod
def _clean_filters(filters: dict[str, str | None]) -> dict[str, str]:
"""Replace 'None' with '&' in a dictionary.

Required for payload in 'x-www-form-urlencoded' from.

Returns:
dict[str, str]: Dictionary with filters prepared for further use.
"""
return {key: ("&" if val is None else val) for key, val in filters.items()}

def get_data(self) -> dict[str, Any]:
"""Obtain data from Business Core API.

Returns:
dict: Dictionary with data downloaded from Business Core API.
"""
view = self.url.split("/")[-1]

if view not in [
"GetCustomerData",
"GetItemMaster",
"GetPendingSalesOrderData",
"GetSalesInvoiceData",
"GetSalesReturnDetailData",
"GetSalesOrderData",
"GetSalesQuotationData",
]:
error_message = f"View {view} currently not available."
raise APIError(error_message)

payload = (
"BucketCount="
+ str(self.filters.get("BucketCount"))
+ "BucketNo="
+ str(self.filters.get("BucketNo"))
+ "FromDate="
+ str(self.filters.get("FromDate"))
+ "ToDate"
+ str(self.filters.get("ToDate"))
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Bearer " + self.generate_token(),
}
self.logger.info("Downloading the data...")
response = handle_api_response(
url=self.url,
headers=headers,
method="GET",
data=payload,
verify=self.verify,
)
self.logger.info("Data was downloaded successfully.")
return json.loads(response.text).get("MasterDataList")

@add_viadot_metadata_columns
def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame:
"""Download data into a pandas DataFrame.

Args:
if_empty (Literal["warn", "fail", "skip"], optional): What to do if output
DataFrame is empty. Defaults to "skip".

Returns:
pd.DataFrame: DataFrame with the data.

Raises:
APIError: When selected API view is not available.
"""
data = self.get_data()
df = pd.DataFrame.from_dict(data)
self.logger.info(
f"Data was successfully transformed into DataFrame: {len(df.columns)} columns and {len(df)} rows."
)
if df.empty:
self._handle_if_empty(if_empty)

return df
Loading