Skip to content

Commit

Permalink
Add Epicor connector, tasks, and flow (#1002)
Browse files Browse the repository at this point in the history
* 🚧 Added epicor prefect connector

* 🎨 Format code

* 🎨 Format code

* 🎨 Adjust code to pre-comit requirements

* 🎨 Adjust code to pre-comit requirements

* 🎨 Remove whitespace

* 🎨 Remove whitespace

* 🎨 Remove whitespace

* πŸ”₯ removed credentials dict

* πŸ› Fix a typo in import

* πŸ“ Updated docstrings

* 🎨 Moved `filters_xml`

* βœ… Add test for each epicor xml schema

* πŸ“ Update doctring with example

* 🚧 Added epicor prefect connector

* 🎨 Format code

* 🎨 Format code

* 🎨 Adjust code to pre-comit requirements

* 🎨 Adjust code to pre-comit requirements

* 🎨 Remove whitespace

* 🎨 Remove whitespace

* 🎨 Remove whitespace

* πŸ”₯ removed credentials dict

* πŸ“ Updated docstrings

* 🎨 Moved `filters_xml`

* βœ… Add test for each epicor xml schema

* πŸ“ Update doctring with example

* πŸ”₯ remove unused code

* πŸ”– Bump viadot version

---------

Co-authored-by: angelika233 <[email protected]>
  • Loading branch information
angelika233 and angelika233 authored Sep 2, 2024
1 parent c8fe0e4 commit b0c4dac
Show file tree
Hide file tree
Showing 11 changed files with 781 additions and 3 deletions.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "viadot2"
version = "2.1.14"
version = "2.1.15"
description = "A simple data ingestion library to guide data flows from some places to other places."
authors = [
{ name = "acivitillo", email = "[email protected]" },
Expand Down Expand Up @@ -32,6 +32,8 @@ dependencies = [
"pyarrow>=10.0, <10.1.0",
# numpy>=2.0 is not compatible with the old pyarrow v10.x.
"numpy>=1.23.4, <2.0",
"defusedxml>=0.7.1",

]
requires-python = ">=3.10"
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
aiohappyeyeballs==2.4.0
Expand Down Expand Up @@ -109,6 +108,7 @@ decorator==5.1.1
defusedxml==0.7.1
# via cairosvg
# via nbconvert
# via viadot2
dnspython==2.6.1
# via email-validator
docker==7.1.0
Expand Down
3 changes: 2 additions & 1 deletion requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
aiolimiter==1.1.0
Expand Down Expand Up @@ -64,6 +63,8 @@ cryptography==43.0.0
# via prefect
dateparser==1.2.0
# via prefect
defusedxml==0.7.1
# via viadot2
dnspython==2.6.1
# via email-validator
docker==7.1.0
Expand Down
4 changes: 4 additions & 0 deletions src/viadot/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class DBDataAccessError(Exception):
pass


class DataRangeError(Exception):
pass


class TableDoesNotExistError(Exception):
def __init__(
self,
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .duckdb_to_parquet import duckdb_to_parquet
from .duckdb_to_sql_server import duckdb_to_sql_server
from .duckdb_transform import duckdb_transform
from .epicor_to_parquet import epicor_to_parquet
from .exchange_rates_to_adls import exchange_rates_to_adls
from .exchange_rates_to_databricks import exchange_rates_to_databricks
from .genesys_to_adls import genesys_to_adls
Expand All @@ -29,6 +30,7 @@
"duckdb_to_parquet",
"duckdb_to_sql_server",
"duckdb_transform",
"epicor_to_parquet",
"exchange_rates_to_adls",
"exchange_rates_to_databricks",
"genesys_to_adls",
Expand Down
93 changes: 93 additions & 0 deletions src/viadot/orchestration/prefect/flows/epicor_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Flows for downloading data from Epicor Prelude API to Parquet file."""

from typing import Literal

from prefect import flow

from viadot.orchestration.prefect.tasks import epicor_to_df
from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet


@flow(
name="extract--epicor--parquet",
description="Extract data from Epicor Prelude API and load it into Parquet file",
retries=1,
retry_delay_seconds=60,
)
def epicor_to_parquet(
path: str,
base_url: str,
filters_xml: str,
if_exists: Literal["append", "replace", "skip"] = "replace",
validate_date_filter: bool = True,
start_date_field: str = "BegInvoiceDate",
end_date_field: str = "EndInvoiceDate",
epicor_credentials_secret: str | None = None,
epicor_config_key: str | None = None,
) -> None:
"""Download a pandas `DataFrame` from Epicor Prelude API load it into Parquet file.
Args:
path (str): Path to Parquet file, where the data will be located.
Defaults to None.
base_url (str, required): Base url to Epicor.
filters_xml (str, required): Filters in form of XML. The date filter
is required.
if_exists (Literal["append", "replace", "skip"], optional): Information what
has to be done, if the file exists. Defaults to "replace"
validate_date_filter (bool, optional): Whether or not validate xml date filters.
Defaults to True.
start_date_field (str, optional) The name of filters field containing
start date. Defaults to "BegInvoiceDate".
end_date_field (str, optional) The name of filters field containing end date.
Defaults to "EndInvoiceDate".
epicor_credentials_secret (str, optional): The name of the secret storing
the credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
epicor_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
Examples:
>>> epicor_to_parquet(
>>> path = "my_parquet.parquet",
>>> base_url = "/api/data/import/ORDER.QUERY",
>>> filters_xml = "<OrderQuery>
>>> <QueryFields>
>>> <CompanyNumber>001</CompanyNumber>
>>> <CustomerNumber></CustomerNumber>
>>> <SellingWarehouse></SellingWarehouse>
>>> <OrderNumber></OrderNumber>
>>> <WrittenBy></WrittenBy>
>>> <CustomerPurchaseOrderNumber></CustomerPurchaseOrderNumber>
>>> <CustomerReleaseNumber></CustomerReleaseNumber>
>>> <CustomerJobNumber></CustomerJobNumber>
>>> <InvoiceNumber></InvoiceNumber>
>>> <EcommerceId></EcommerceId>
>>> <EcommerceOrderNumber></EcommerceOrderNumber>
>>> <QuoteNumber></QuoteNumber>
>>> <BegInvoiceDate>{yesterday}</BegInvoiceDate>
>>> <EndInvoiceDate>{yesterday}</EndInvoiceDate>
>>> <SortXMLTagName></SortXMLTagName>
>>> <SortMethod></SortMethod>
>>> <RecordCount></RecordCount>
>>> <RecordCountPage></RecordCountPage>
>>> </QueryFields>
>>> </OrderQuery>",
>>> epicor_config_key = "epicor"
>>> )
"""
df = epicor_to_df(
base_url=base_url,
filters_xml=filters_xml,
validate_date_filter=validate_date_filter,
start_date_field=start_date_field,
end_date_field=end_date_field,
credentials_secret=epicor_credentials_secret,
config_key=epicor_config_key,
)

return df_to_parquet(
df=df,
path=path,
if_exists=if_exists,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .databricks import df_to_databricks
from .dbt import dbt_task
from .duckdb import duckdb_query
from .epicor import epicor_to_df
from .exchange_rates import exchange_rates_to_df
from .genesys import genesys_to_df
from .git import clone_repo
Expand All @@ -32,6 +33,7 @@
"df_to_databricks",
"dbt_task",
"duckdb_query",
"epicor_to_df",
"exchange_rates_to_df",
"genesys_to_df",
"clone_repo",
Expand Down
64 changes: 64 additions & 0 deletions src/viadot/orchestration/prefect/tasks/epicor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Task for downloading data from Epicor Prelude API."""

import pandas as pd
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.epicor import Epicor


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def epicor_to_df(
base_url: str,
filters_xml: str,
validate_date_filter: bool = True,
start_date_field: str = "BegInvoiceDate",
end_date_field: str = "EndInvoiceDate",
credentials_secret: str | None = None,
config_key: str | None = None,
) -> pd.DataFrame:
"""Load the data from Epicor Prelude API into a pandas DataFrame.
Args:
base_url (str, required): Base url to Epicor.
filters_xml (str, required): Filters in form of XML. The date filter
is required.
validate_date_filter (bool, optional): Whether or not validate xml date filters.
Defaults to True.
start_date_field (str, optional) The name of filters field containing
start date. Defaults to "BegInvoiceDate".
end_date_field (str, optional) The name of filters field containing end date.
Defaults to "EndInvoiceDate".
credentials_secret (str, optional): The name of the secret storing
the credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

logger = get_run_logger()

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)
epicor = Epicor(
credentials=credentials,
base_url=base_url,
validate_date_filter=validate_date_filter,
start_date_field=start_date_field,
end_date_field=end_date_field,
)
df = epicor.to_df(filters_xml=filters_xml)
nrows = df.shape[0]
ncols = df.shape[1]

logger.info(
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame."
)
return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .cloud_for_customers import CloudForCustomers
from .duckdb import DuckDB
from .epicor import Epicor
from .exchange_rates import ExchangeRates
from .genesys import Genesys
from .hubspot import Hubspot
Expand All @@ -17,6 +18,7 @@

__all__ = [
"CloudForCustomers",
"Epicor",
"ExchangeRates",
"Genesys",
"Outlook",
Expand Down
Loading

0 comments on commit b0c4dac

Please sign in to comment.