Skip to content

Commit

Permalink
Merge pull request #284 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.3.0
  • Loading branch information
m-paz authored Feb 16, 2022
2 parents ff0f515 + 661841b commit da3a5a9
Show file tree
Hide file tree
Showing 57 changed files with 2,193 additions and 67 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,6 @@ config.toml
desktop.ini

.viminfo

# SAP RFC lib
sap_netweaver_rfc
29 changes: 28 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,45 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.3.0] - 2022-02-16
### Added
- new source `SAPRFC` for connecting with SAP using the `pyRFC` library (requires pyrfc as well as the SAP NW RFC library that can be downloaded [here](https://support.sap.com/en/product/connectors/nwrfcsdk.html)
- new source `DuckDB` for connecting with the `DuckDB` database
- new task `SAPRFCToDF` for loading data from SAP to a pandas DataFrame
- new tasks, `DuckDBQuery` and `DuckDBCreateTableFromParquet`, for interacting with DuckDB
- new flow `SAPToDuckDB` for moving data from SAP to DuckDB
- Added `CheckColumnOrder` task
- C4C connection with url and report_url documentation
-`SQLIteInsert` check if DataFrame is empty or object is not a DataFrame
- KeyVault support in `SharepointToDF` task
- KeyVault support in `CloudForCustomers` tasks

### Changed
- pinned Prefect version to 0.15.11
- `df_to_csv` now creates dirs if they don't exist
- `ADLSToAzureSQL` - when data in csv coulmns has unnecessary "\t" then removes them

### Fixed
- fixed an issue with duckdb calls seeing initial db snapshot instead of the updated state (#282)
- C4C connection with url and report_url optimization
- column mapper in C4C source

## [0.2.15] - 2022-01-12
### Added
- new option to `ADLSToAzureSQL` Flow - `if_exists="delete"`
- `SQL` source: `create_table()` already handles `if_exists`; now it handles a new option for `if_exists()`
- `C4CToDF` and `C4CReportToDF` tasks are provided as a class instead of function


### Fixed
- Appending issue within CloudForCustomers source
- An early return bug in `UKCarbonIntensity` in `to_df` method


## [0.2.14] - 2021-12-01

### Fixed
- authorization issue within `CloudForCustomers` source

Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,20 @@ However, when developing, the easiest way is to use the provided Jupyter Lab con

Please follow the standards and best practices used within the library (eg. when adding tasks, see how other tasks are constructed, etc.). For any questions, please reach out to us here on GitHub.


### Style guidelines
- the code should be formatted with Black using default settings (easiest way is to use the VSCode extension)
- commit messages should:
- begin with an emoji
- start with one of the following verbs, capitalized, immediately after the summary emoji: "Added", "Updated", "Removed", "Fixed", "Renamed", and, sporadically, other ones, such as "Upgraded", "Downgraded", or whatever you find relevant for your particular situation
- contain a useful description of what the commit is doing
- contain a useful description of what the commit is doing

## Set up Black for development in VSCode
Your code should be formatted with Black when you want to contribute. To set up Black in Visual Studio Code follow instructions below.
1. Install `black` in your environment by writing in the terminal:
```
pip install black
```
2. Go to the settings - gear icon in the bottom left corner and select `Settings` or type "Ctrl" + ",".
3. Find the `Format On Save` setting - check the box.
4. Find the `Python Formatting Provider` and select "black" in the drop-down list.
5. Your code should auto format on save now.
11 changes: 10 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM prefecthq/prefect:latest-python3.8
FROM prefecthq/prefect:0.15.11-python3.8


# Add user
Expand All @@ -11,10 +11,12 @@ RUN useradd --create-home viadot && \
RUN groupadd docker && \
usermod -aG docker viadot


# Release File Error
# https://stackoverflow.com/questions/63526272/release-file-is-not-valid-yet-docker
RUN echo "Acquire::Check-Valid-Until \"false\";\nAcquire::Check-Date \"false\";" | cat > /etc/apt/apt.conf.d/10no--check-valid-until


# System packages
RUN apt update && yes | apt install vim unixodbc-dev build-essential \
curl python3-dev libboost-all-dev libpq-dev graphviz python3-gi sudo git
Expand All @@ -36,7 +38,14 @@ RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \

COPY docker/odbcinst.ini /etc


# Python env

# This one's needed for the SAP RFC connector.
# It must be installed here as the SAP package does not define its dependencies,
# so `pip install pyrfc` breaks if all deps are not already present.
RUN pip install cython==0.29.24

WORKDIR /code
COPY requirements.txt /code/
RUN pip install --upgrade pip
Expand Down
4 changes: 3 additions & 1 deletion docs/references/api_sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

::: viadot.sources.uk_carbon_intensity.UKCarbonIntensity

::: viadot.sources.supermetrics.Supermetrics
::: viadot.sources.supermetrics.Supermetrics

::: viadot.sources.cloud_for_customers.CloudForCustomers
3 changes: 1 addition & 2 deletions docs/references/flows_library.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@
::: viadot.flows.azure_sql_transform.AzureSQLTransform
::: viadot.flows.supermetrics_to_adls.SupermetricsToADLS
::: viadot.flows.supermetrics_to_azure_sql.SupermetricsToAzureSQL


::: viadot.flows.cloud_for_customers_report_to_adls.CloudForCustomersReportToADLS
6 changes: 4 additions & 2 deletions docs/references/task_library.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
::: viadot.tasks.azure_data_lake.AzureDataLakeCopy
::: viadot.tasks.azure_data_lake.AzureDataLakeList


::: viadot.tasks.azure_key_vault.AzureKeyVaultSecret
::: viadot.tasks.azure_key_vault.CreateAzureKeyVaultSecret
::: viadot.tasks.azure_key_vault.DeleteAzureKeyVaultSecret
Expand All @@ -29,4 +28,7 @@
::: viadot.tasks.supermetrics.SupermetricsToDF

:::viadot.task_utils.add_ingestion_metadata_task
:::viadot.task_utils.get_latest_timestamp_file_path
:::viadot.task_utils.get_latest_timestamp_file_path

::: viadot.tasks.cloud_for_customers.C4CToDF
::: viadot.tasks.cloud_for_customers.C4CReportToDF
8 changes: 8 additions & 0 deletions docs/tutorials/sharepoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# How to pull excel file from Sharepoint

With Viadot you can download Excel file from Sharepoint and then upload it to Azure Data Lake. You can set a URL to file on Sharepoint an specify parameters such as path to local Excel file, number of rows and sheet number to be extracted.

## Pull data from Sharepoint and save output as a csv file on Azure Data Lake

To pull Excel file from Sharepint we create flow basing on `SharepointToADLS`
:::viadot.flows.SharepointToADLS
6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mkdocs-material==8.0.1
mkdocs==1.2.3
mkdocstrings==0.16.2
pandas==1.3.4
prefect[viz]==0.15.5
prefect[viz]==0.15.11
pyarrow==6.0.1
pyodbc==4.0.32
pytest==6.2.5
Expand All @@ -24,4 +24,6 @@ PyGithub==1.55
Shapely==1.8.0
imagehash==4.2.1
visions==0.7.4
sharepy==1.3.0
sharepy==1.3.0
sql-metadata==2.3.0
duckdb==0.3.1
6 changes: 6 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def get_version(package: str):
with open("README.md", "r") as fh:
long_description = fh.read()


extras = {
"sap": ["pyrfc==2.5.0", "sql-metadata==2.3.0"],
}

setuptools.setup(
name="viadot",
version=get_version("viadot"),
Expand All @@ -22,6 +27,7 @@ def get_version(package: str):
long_description_content_type="text/markdown",
url="https://github.com/dyvenia/viadot",
packages=setuptools.find_packages(),
extras=extras,
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
Expand Down
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def TEST_PARQUET_FILE_PATH():
return "test_data_countries.parquet"


@pytest.fixture(scope="session")
def TEST_PARQUET_FILE_PATH_2():
return "test_data_countries_2.parquet"


@pytest.fixture(scope="session")
def TEST_CSV_FILE_BLOB_PATH():
return "tests/test.csv"
Expand All @@ -44,3 +49,10 @@ def create_test_parquet_file(DF, TEST_PARQUET_FILE_PATH):
DF.to_parquet(TEST_PARQUET_FILE_PATH, index=False)
yield
os.remove(TEST_PARQUET_FILE_PATH)


@pytest.fixture(scope="session", autouse=True)
def create_test_parquet_file_2(DF, TEST_PARQUET_FILE_PATH_2):
DF.to_parquet(TEST_PARQUET_FILE_PATH_2, index=False)
yield
os.remove(TEST_PARQUET_FILE_PATH_2)
11 changes: 11 additions & 0 deletions tests/integration/flows/test_adls_to_azure_sql.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pandas as pd
from viadot.flows import ADLSToAzureSQL
from viadot.flows.adls_to_azure_sql import df_to_csv_task


def test_get_promoted_adls_path_csv_file():
Expand Down Expand Up @@ -44,3 +46,12 @@ def test_get_promoted_adls_path_dir_starts_with_slash():
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_df_to_csv_task():
d = {"col1": ["rat", "\tdog"], "col2": ["cat", 4]}
df = pd.DataFrame(data=d)
assert df["col1"].astype(str).str.contains("\t")[1] == True
task = df_to_csv_task
task.run(df, "result.csv")
assert df["col1"].astype(str).str.contains("\t")[1] != True
47 changes: 47 additions & 0 deletions tests/integration/flows/test_sap_to_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

from viadot.config import local_config

try:
import pyrfc
except ModuleNotFoundError:
raise

from viadot.flows import SAPToDuckDB

sap_test_creds = local_config.get("SAP").get("TEST")
duckdb_creds = {"database": "test1.duckdb"}


def test_sap_to_duckdb():
flow = SAPToDuckDB(
name="SAPToDuckDB flow test",
query="""
select
,CLIENT as client
,KNUMV as number_of_the_document_condition
,KPOSN as condition_item_number
,STUNR as step_number
,KAPPL as application
from PRCD_ELEMENTS
where KNUMV = '2003393196'
and KPOSN = '000001'
or STUNR = '570'
and CLIENT = '009'
limit 3
""",
schema="main",
table="test",
local_file_path="local.parquet",
table_if_exists="replace",
sap_credentials=sap_test_creds,
duckdb_credentials=duckdb_creds,
)

result = flow.run()
assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])

os.remove("test1.duckdb")
71 changes: 70 additions & 1 deletion tests/integration/tasks/test_azure_sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import pandas as pd
import pytest
from viadot.exceptions import ValidationError

from viadot.tasks import AzureSQLCreateTable, AzureSQLDBQuery
from viadot.tasks import AzureSQLCreateTable, AzureSQLDBQuery, CheckColumnOrder

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,3 +66,69 @@ def test_azure_sql_run_drop_query():
"""
exists = bool(sql_query_task.run(list_table_info_query))
assert not exists


def test_check_column_order_append_same_col_number(caplog):
create_table_task = AzureSQLCreateTable()
with caplog.at_level(logging.INFO):
create_table_task.run(
schema=SCHEMA,
table=TABLE,
dtypes={"id": "INT", "name": "VARCHAR(25)", "street": "VARCHAR(25)"},
if_exists="replace",
)
assert "Successfully created table sandbox" in caplog.text

data = {"id": [1], "street": ["Green"], "name": ["Tom"]}
df = pd.DataFrame(data)

check_column_order = CheckColumnOrder()
with caplog.at_level(logging.WARNING):
check_column_order.run(table=TABLE, if_exists="append", df=df)

assert (
"Detected column order difference between the CSV file and the table. Reordering..."
in caplog.text
)


def test_check_column_order_append_diff_col_number(caplog):
create_table_task = AzureSQLCreateTable()
with caplog.at_level(logging.INFO):
create_table_task.run(
schema=SCHEMA,
table=TABLE,
dtypes={"id": "INT", "name": "VARCHAR(25)", "street": "VARCHAR(25)"},
if_exists="replace",
)
assert "Successfully created table sandbox" in caplog.text

data = {"id": [1], "age": ["40"], "street": ["Green"], "name": ["Tom"]}
df = pd.DataFrame(data)
print(f"COMP: \ndf: {df.columns} \nsql: ")
check_column_order = CheckColumnOrder()
with pytest.raises(
ValidationError,
match=r"Detected discrepancies in number of columns or different column names between the CSV file and the SQL table!",
):
check_column_order.run(table=TABLE, if_exists="append", df=df)


def test_check_column_order_replace(caplog):
create_table_task = AzureSQLCreateTable()
with caplog.at_level(logging.INFO):
create_table_task.run(
schema=SCHEMA,
table=TABLE,
dtypes={"id": "INT", "name": "VARCHAR(25)", "street": "VARCHAR(25)"},
if_exists="replace",
)
assert "Successfully created table sandbox" in caplog.text

data = {"id": [1], "street": ["Green"], "name": ["Tom"]}
df = pd.DataFrame(data)

check_column_order = CheckColumnOrder()
with caplog.at_level(logging.INFO):
check_column_order.run(table=TABLE, if_exists="replace", df=df)
assert "The table will be replaced." in caplog.text
10 changes: 9 additions & 1 deletion tests/integration/tasks/test_cloud_for_customers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from viadot.tasks import C4CToDF, C4CReportToDF
from viadot.config import local_config
from prefect.tasks.secrets import PrefectSecret


def test_c4c_to_df():
Expand All @@ -8,7 +9,6 @@ def test_c4c_to_df():
c4c_to_df = C4CToDF()
df = c4c_to_df.run(url=url, endpoint=endpoint)
answer = df.head()

assert answer.shape[1] == 23


Expand All @@ -21,3 +21,11 @@ def test_c4c_report_to_df():
answer = df.head()

assert answer.shape[0] == 5


def test_c4c_to_df_kv():
task = C4CToDF()
credentials_secret = PrefectSecret("C4C_KV").run()
res = task.run(credentials_secret=credentials_secret, endpoint="ActivityCollection")
answer = res.head()
assert answer.shape[1] == 19
Loading

0 comments on commit da3a5a9

Please sign in to comment.