Skip to content

Commit

Permalink
Merge branch 'dev' into feature/sharepoint_multiflows_control
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinpurtak authored Dec 6, 2023
2 parents 800b628 + 8f447aa commit 82308bb
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Added
- Added tests for new functionalities in SAPRFC and SAPRFCV2 regarding passing credentials
- Added new params for mapping and reordering DataFrame for `Genesys` task and flow.
- Tasks to search for logs in the flow
- Tasks to find flow ID
Expand All @@ -14,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- if_no_data_returned added for sharepoint list flow which can fail,warn in case of no data returend or skip (continue) execution in the old way
- Changed __init__ in SAPRFC and SAPRFCV2 class in source in order to raise warning in prefect when credentials will be taken from DEV.

## [0.4.22] - 2023-11-15
### Added
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/flows/test_sharepoint_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pendulum
import pytest
from prefect.tasks.secrets import PrefectSecret
from viadot.flows import SharepointToADLS, SharepointListToADLS
from viadot.flows import SharepointListToADLS, SharepointToADLS
from viadot.tasks import AzureDataLakeRemove

ADLS_FILE_NAME = str(pendulum.now("utc")) + ".csv"
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/test_sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,19 @@ def test___build_pandas_filter_query_v2():
sap2._build_pandas_filter_query(sap2.client_side_filters)
== "thirdlongcolname == 01234"
), sap2._build_pandas_filter_query(sap2.client_side_filters)


def test_default_credentials_warning_SAPRFC(caplog):
_ = SAPRFC()
assert (
"Your credentials will use DEV environment. If you would like to use different one - please specified it."
in caplog.text
)


def test_default_credentials_warning_SAPRFCV2(caplog):
_ = SAPRFCV2()
assert (
"Your credentials will use DEV environment. If you would like to use different one - please specified it."
in caplog.text
)
1 change: 0 additions & 1 deletion tests/integration/test_vid_club.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_url_string():
expected_elements = [
f"from={from_date}",
f"to={to_date}",
"region=all",
f"limit={items_per_page}",
api_url,
]
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from viadot.utils import (
add_viadot_metadata_columns,
check_if_empty_file,
gen_bulk_insert_query_from_df,
check_value,
gen_bulk_insert_query_from_df,
)

EMPTY_CSV_PATH = "empty.csv"
Expand Down
4 changes: 2 additions & 2 deletions viadot/flows/vid_club_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
from_date: str = "2022-03-22",
to_date: str = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all",
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
days_interval: int = 30,
cols_to_drop: List[str] = None,
vid_club_credentials: Dict[str, Any] = None,
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(
from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. Defaults to 100.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API]
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs]
days_interval (int, optional): Days specified in date range per API call (test showed that 30-40 is optimal for performance). Defaults to 30.
cols_to_drop (List[str], optional): List of columns to drop. Defaults to None.
vid_club_credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None.
Expand Down
1 change: 1 addition & 0 deletions viadot/sources/sap_bw.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import textwrap
from typing import List

import pyrfc

from viadot.exceptions import CredentialError, ValidationError
Expand Down
20 changes: 16 additions & 4 deletions viadot/sources/sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,15 @@ def __init__(

self._con = None
DEFAULT_CREDENTIALS = local_config.get("SAP").get("DEV")
credentials = kwargs.pop("credentials", None) or DEFAULT_CREDENTIALS

credentials = kwargs.pop("credentials", None)
if credentials is None:
raise CredentialError("Missing credentials.")
credentials = DEFAULT_CREDENTIALS
if credentials is None:
raise CredentialError("Missing credentials.")
logger.warning(
"Your credentials will use DEV environment. If you would like to use different one - please specified it."
)

super().__init__(*args, credentials=credentials, **kwargs)

Expand Down Expand Up @@ -694,9 +700,15 @@ def __init__(

self._con = None
DEFAULT_CREDENTIALS = local_config.get("SAP").get("DEV")
credentials = kwargs.pop("credentials", None) or DEFAULT_CREDENTIALS

credentials = kwargs.pop("credentials", None)
if credentials is None:
raise CredentialError("Missing credentials.")
credentials = DEFAULT_CREDENTIALS
if credentials is None:
raise CredentialError("Missing credentials.")
logger.warning(
"Your credentials will use DEV environment. If you would like to use different one - please specified it in 'sap_credentials' variable inside the flow."
)

super().__init__(*args, credentials=credentials, **kwargs)

Expand Down
32 changes: 20 additions & 12 deletions viadot/sources/vid_club.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import urllib
from pandas.io.json import json_normalize
from datetime import date, datetime, timedelta
from typing import Any, Dict, List, Literal, Tuple

Expand Down Expand Up @@ -46,7 +47,7 @@ def build_query(
api_url: str,
items_per_page: int,
source: Literal["jobs", "product", "company", "survey"] = None,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all",
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
) -> str:
"""
Builds the query from the inputs.
Expand All @@ -57,7 +58,7 @@ def build_query(
api_url (str): Generic part of the URL to Vid Club API.
items_per_page (int): number of entries per page.
source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API]
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs]
Returns:
str: Final query with all filters added.
Expand All @@ -66,7 +67,8 @@ def build_query(
ValidationError: If any source different than the ones in the list are used.
"""
if source in ["jobs", "product", "company"]:
url = f"{api_url}{source}?from={from_date}&to={to_date}&region={region}&limit={items_per_page}"
region_url_string = f"&region={region}" if region else ""
url = f"{api_url}{source}?from={from_date}&to={to_date}{region_url_string}&limit={items_per_page}"
elif source == "survey":
url = f"{api_url}{source}?language=en&type=question"
else:
Expand Down Expand Up @@ -128,7 +130,7 @@ def check_connection(
from_date: str = "2022-03-22",
to_date: str = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all",
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
url: str = None,
) -> Tuple[Dict[str, Any], str]:
"""
Expand All @@ -140,7 +142,7 @@ def check_connection(
from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. 100 entries by default.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API]
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs]
url (str, optional): Generic part of the URL to Vid Club API. Defaults to None.
Returns:
Expand Down Expand Up @@ -173,7 +175,6 @@ def check_connection(
url=first_url, headers=headers, method="GET", verify=False
)
response = response.json()

return (response, first_url)

def get_response(
Expand All @@ -182,7 +183,7 @@ def get_response(
from_date: str = "2022-03-22",
to_date: str = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all",
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
) -> pd.DataFrame:
"""
Basing on the pagination type retrieved using check_connection function, gets the response from the API queried and transforms it into DataFrame.
Expand All @@ -192,7 +193,7 @@ def get_response(
from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. 100 entries by default.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API]
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs]
Returns:
pd.DataFrame: Table of the data carried in the response.
Expand Down Expand Up @@ -229,7 +230,8 @@ def get_response(
ind = False

if "data" in keys_list:
df = pd.DataFrame(response["data"])
df = json_normalize(response["data"])
df = pd.DataFrame(df)
length = df.shape[0]
page = 1

Expand All @@ -244,7 +246,8 @@ def get_response(
url=url, headers=headers, method="GET", verify=False
)
response = r.json()
df_page = pd.DataFrame(response["data"])
df_page = json_normalize(response["data"])
df_page = pd.DataFrame(df_page)
if source == "product":
df_page = df_page.transpose()
length = df_page.shape[0]
Expand All @@ -260,7 +263,7 @@ def total_load(
from_date: str = "2022-03-22",
to_date: str = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all",
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
days_interval: int = 30,
) -> pd.DataFrame:
"""
Expand All @@ -272,7 +275,7 @@ def total_load(
from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. 100 entries by default.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API]
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs]
days_interval (int, optional): Days specified in date range per api call (test showed that 30-40 is optimal for performance). Defaults to 30.
Returns:
Expand Down Expand Up @@ -307,6 +310,11 @@ def total_load(
items_per_page=items_per_page,
region=region,
)
list_columns = df.columns[
df.applymap(lambda x: isinstance(x, list)).any()
].tolist()
for i in list_columns:
df[i] = df[i].apply(lambda x: tuple(x) if isinstance(x, list) else x)
df.drop_duplicates(inplace=True)

if df.empty:
Expand Down
2 changes: 1 addition & 1 deletion viadot/tasks/genesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

from viadot.exceptions import APIError
from viadot.sources import Genesys
from viadot.utils import check_value
from viadot.task_utils import *
from viadot.utils import check_value

logger = logging.get_logger()

Expand Down
4 changes: 2 additions & 2 deletions viadot/tasks/vid_club.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def run(
from_date: str = "2022-03-22",
to_date: str = None,
items_per_page: int = 100,
region: str = "all",
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None,
days_interval: int = 30,
cols_to_drop: List[str] = None,
) -> pd.DataFrame:
Expand All @@ -98,7 +98,7 @@ def run(
from_date (str, optional): Start date for the query, by default is the oldest date in the data, '2022-03-22'.
to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. 100 entries by default.
region (str, optional): Region filter for the query. Valid inputs: ["bg", "hu", "hr", "pl", "ro", "si", "all"]. Defaults to "all".
region (str, optional): Region filter for the query. Valid inputs: ["bg", "hu", "hr", "pl", "ro", "si", "all"]. Defaults to None.
days_interval (int, optional): Days specified in date range per api call (test showed that 30-40 is optimal for performance). Defaults to 30.
cols_to_drop (List[str], optional): List of columns to drop. Defaults to None.
Expand Down
2 changes: 1 addition & 1 deletion viadot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import re
from itertools import chain
from typing import Union, Any, Callable, Dict, List, Literal
from typing import Any, Callable, Dict, List, Literal, Union

import pandas as pd
import prefect
Expand Down

0 comments on commit 82308bb

Please sign in to comment.