Skip to content

Commit

Permalink
Merge pull request #245 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.2.15 PR
  • Loading branch information
m-paz authored Jan 12, 2022
2 parents 514d393 + 62775f4 commit b4d975e
Show file tree
Hide file tree
Showing 21 changed files with 373 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ config.toml
# MKDocs
desktop.ini

.viminfo
.viminfo
40 changes: 13 additions & 27 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [Unreleased]

## [0.2.14]
## [0.2.15] - 2022-01-12
### Added
- new option to `ADLSToAzureSQL` Flow - `if_exists="delete"`
- `SQL` source: `create_table()` already handles `if_exists`; now it handles a new option for `if_exists()`
- `C4CToDF` and `C4CReportToDF` tasks are provided as a class instead of function

### Fixed
- Appending issue within CloudForCustomers source
## [0.2.14] - 2021-12-01
### Fixed
- authorization issue within `CloudForCustomers` source

## [0.2.13]
## [0.2.13] - 2021-11-30
### Added
- Added support for file path to `CloudForCustomersReportToADLS` flow
- Added `flow_of_flows` list handling
Expand All @@ -24,7 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Sharepoint` and `CloudForCustomers` sources will now provide an informative `CredentialError` which is also raised early. This will make issues with input credenials immediately clear to the user.
- Removed set_key_value from `CloudForCustomersReportToADLS` flow

## [0.2.12]
## [0.2.12] - 2021-11-25
### Added
- Added `Sharepoint` source
- Added `SharepointToDF` task
Expand All @@ -37,7 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `df_to_parquet` task to task_utils.py
- Added `dtypes_to_json` task to task_utils.py

## [0.2.11]
## [0.2.11] - 2021-10-30

### Fixed
- `ADLSToAzureSQL` - fixed path to csv issue.
- `SupermetricsToADLS` - fixed local json path issue.
Expand All @@ -48,28 +56,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.2.9] - 2021-10-29
### Release due to CI/CD error

## [0.2.8] - 2021-10-29
### Changed
- CI/CD: `dev` image is now only published on push to the `dev` branch
- Docker:
- updated registry links to use the new `ghcr.io` domain
- `run.sh` now also accepts the `-t` option. When run in standard mode, it will only spin up the `viadot_jupyter_lab` service.
When ran with `-t dev`, it will also spin up `viadot_testing` and `viadot_docs` containers.

### Fixed
- `ADLSToAzureSQL` - fixed path parameter issue.

## [0.2.11]
### Fixed
- ADLSToAzureSQL - fixed path to csv issue.
- SupermetricsToADLS - fixed local json path issue.

## [0.2.10] - 2021-10-29
### Release due to CI/CD error

## [0.2.9] - 2021-10-29
### Release due to CI/CD error

## [0.2.8] - 2021-10-29
### Changed
- CI/CD: `dev` image is now only published on push to the `dev` branch
Expand Down
9 changes: 9 additions & 0 deletions docs/tutorials/cloud_for_customers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# How to pull CloudForCustomers data

With Viadot you can pull data from CloudForCustomers API, save it in csv and parquet format on the Azure Data Lake.
You can connect directly with prepeard report URL adress or with table adding special parameters to fetch the data.

## Pull data from Supermetrics and save output as a csv file on Azure Data Lake

To pull data from CloudForCustomers we will create flow basing on `CloudForCustomersReportToADLS`
:::viadot.flows.CloudForCustomersReportToADLS
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def test_cloud_for_customers_report_to_adls():
month = ["01"]
year = ["2021"]
flow = CloudForCustomersReportToADLS(
direct_url=credentials_prod["server"],
source_type="Prod",
report_url=credentials_prod["server"],
env="Prod",
channels=channels,
months=month,
years=year,
Expand Down
23 changes: 23 additions & 0 deletions tests/integration/tasks/test_cloud_for_customers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from viadot.tasks import C4CToDF, C4CReportToDF
from viadot.config import local_config


def test_c4c_to_df():
url = "http://services.odata.org/V2/Northwind/Northwind.svc/"
endpoint = "Employees"
c4c_to_df = C4CToDF()
df = c4c_to_df.run(url=url, endpoint=endpoint)
answer = df.head()

assert answer.shape[1] == 23


def test_c4c_report_to_df():
credentials = local_config.get("CLOUD_FOR_CUSTOMERS")
credentials_prod = credentials["Prod"]
report_url = credentials_prod["server"]
c4c_report_to_df = C4CReportToDF()
df = c4c_report_to_df.run(report_url=report_url, env="Prod")
answer = df.head()

assert answer.shape[0] == 5
37 changes: 36 additions & 1 deletion tests/integration/test_azuresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ def test_connection(azure_sql):
azure_sql.con


def test_create_table(azure_sql):
def test_table_exists(azure_sql):
result = azure_sql.exists(table=TABLE, schema=SCHEMA)
assert result == False


def test_create_table_replace(azure_sql):
dtypes = {"country": "VARCHAR(100)", "sales": "FLOAT(24)"}
result = azure_sql.create_table(
schema=SCHEMA, table=TABLE, dtypes=dtypes, if_exists="replace"
Expand All @@ -44,6 +49,36 @@ def test_create_table(azure_sql):
assert table_object_id is not None


def test_create_table_delete(azure_sql, TEST_CSV_FILE_BLOB_PATH):
insert_executed = azure_sql.bulk_insert(
schema=SCHEMA,
table=TABLE,
source_path=TEST_CSV_FILE_BLOB_PATH,
if_exists="replace",
)
assert insert_executed is True

result = azure_sql.run(f"SELECT SUM(sales) FROM {SCHEMA}.{TABLE} AS total")
assert int(result[0][0]) == 230

table_object_id_insert = azure_sql.run(
f"SELECT OBJECT_ID('{SCHEMA}.{TABLE}', 'U')"
)[0][0]

delete_executed = azure_sql.create_table(
schema=SCHEMA, table=TABLE, if_exists="delete"
)
assert delete_executed is True

table_object_id_delete = azure_sql.run(
f"SELECT OBJECT_ID('{SCHEMA}.{TABLE}', 'U')"
)[0][0]

result = azure_sql.run(f"SELECT SUM(sales) FROM {SCHEMA}.{TABLE} AS total")
assert result[0][0] is None
assert table_object_id_insert == table_object_id_delete


def test_create_table_skip_1(azure_sql):
"""Test that if_exists = "skip" works when the table doesn't exist"""
dtypes = {"country": "VARCHAR(100)", "sales": "FLOAT(24)"}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.2.14"
assert __version__ == "0.2.15"
19 changes: 10 additions & 9 deletions tests/unit/test_cloud_for_customers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from viadot.sources import CloudForCustomers

import os
import numpy
import pytest


Expand All @@ -19,18 +18,20 @@ def cloud_for_customers():
os.remove(TEST_FILE_1)


def test_to_df(cloud_for_customers):
df = cloud_for_customers.to_df(fields=["EmployeeID", "FirstName", "LastName"])
assert not df.empty
assert df.shape[1] == 3
assert df.shape[0] == 2


def test_to_records(cloud_for_customers):
data = cloud_for_customers.to_records()
assert "EmployeeID" in data[0].keys()


def test_to_df(cloud_for_customers):
df = cloud_for_customers.to_df(fields=["EmployeeID", "FirstName", "LastName"])
assert type(df["EmployeeID"][0]) == numpy.int64
assert data != []
assert len(data) == 2


def test_csv(cloud_for_customers):
csv = cloud_for_customers.to_csv(
path=TEST_FILE_1, fields=["EmployeeID", "FirstName", "LastName"]
)
csv = cloud_for_customers.to_csv(path=TEST_FILE_1)
assert os.path.isfile(TEST_FILE_1) == True
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.14"
__version__ = "0.2.15"
2 changes: 1 addition & 1 deletion viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(
dtypes: Dict[str, Any] = None,
table: str = None,
schema: str = None,
if_exists: Literal["fail", "replace", "append"] = "replace",
if_exists: Literal["fail", "replace", "append", "delete"] = "replace",
sqldb_credentials_secret: str = None,
max_download_retries: int = 5,
tags: List[str] = ["promotion"],
Expand Down
33 changes: 16 additions & 17 deletions viadot/flows/cloud_for_customers_report_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import os
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Union, Optional

import pendulum
from prefect import Flow, Task, apply_map
from prefect.backend import set_key_value

from ..utils import slugify
from ..task_utils import (
add_ingestion_metadata_task,
df_to_csv,
df_to_parquet,
union_dfs_task,
)
from ..tasks import AzureDataLakeUpload, c4c_report_to_df, c4c_to_df
from ..tasks import (
AzureDataLakeUpload,
C4CToDF,
C4CReportToDF,
)

file_to_adls_task = AzureDataLakeUpload()
c4c_report_to_df = C4CReportToDF()
c4c_to_df = C4CToDF()


class CloudForCustomersReportToADLS(Flow):
Expand Down Expand Up @@ -55,16 +61,16 @@ def __init__(
local_file_path (str, optional): Local destination path. Defaults to None.
output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy
to handle with parquet. Defaults to ".csv".
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to False.
adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None.
adls_file_path (str, optional): Azure Data Lake destination file path. Defaults to None.
if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
if_exists (str, optional): What to do if the local file already exists. Defaults to "replace".
skip (int, optional): Initial index value of reading row.
top (int, optional): The value of top reading row.
channels (List[str], optional): Filtering parameters passed to the url.
months (List[str], optional): Filtering parameters passed to the url.
years (List[str], optional): Filtering parameters passed to the url.
skip (int, optional): Initial index value of reading row. Defaults to 0.
top (int, optional): The value of top reading row. Defaults to 1000.
channels (List[str], optional): Filtering parameters passed to the url. Defaults to None.
months (List[str], optional): Filtering parameters passed to the url. Defaults to None.
years (List[str], optional): Filtering parameters passed to the url. Defaults to None.
"""

self.if_empty = if_empty
Expand All @@ -78,7 +84,7 @@ def __init__(
self.overwrite_adls = overwrite_adls
self.output_file_extension = output_file_extension
self.local_file_path = (
local_file_path or self.slugify(name) + self.output_file_extension
local_file_path or slugify(name) + self.output_file_extension
)
self.now = str(pendulum.now("utc"))
self.adls_dir_path = adls_dir_path
Expand Down Expand Up @@ -132,14 +138,9 @@ def create_url_with_fields(self, fields_list: List[str], filter_code: str) -> Li
else:
return self.report_urls_with_filters

@staticmethod
def slugify(name):
return name.replace(" ", "_").lower()

def gen_c4c(
self,
url: str,
report_url: str,
endpoint: str,
params: str,
env: str,
Expand All @@ -151,7 +152,6 @@ def gen_c4c(
env=env,
endpoint=endpoint,
params=params,
report_url=report_url,
flow=flow,
)

Expand Down Expand Up @@ -180,7 +180,6 @@ def gen_flow(self) -> Flow:
elif self.url:
df = self.gen_c4c(
url=self.url,
report_url=self.report_url,
env=self.env,
endpoint=self.endpoint,
params=self.params,
Expand Down
5 changes: 4 additions & 1 deletion viadot/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def create_table(
table: str,
schema: str = None,
dtypes: Dict[str, Any] = None,
if_exists: Literal["fail", "replace", "skip"] = "fail",
if_exists: Literal["fail", "replace", "skip", "delete"] = "fail",
) -> bool:
"""Create a table.
Expand All @@ -241,6 +241,9 @@ def create_table(
if exists:
if if_exists == "replace":
self.run(f"DROP TABLE {fqn}")
elif if_exists == "delete":
self.run(f"DELETE FROM {fqn}")
return True
elif if_exists == "fail":
raise ValueError(
"The table already exists and 'if_exists' is set to 'fail'."
Expand Down
Loading

0 comments on commit b4d975e

Please sign in to comment.