Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve utils.py test coverage #817

Merged
merged 11 commits into from
Dec 6, 2023
230 changes: 184 additions & 46 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@

import pandas as pd
import pytest
from viadot.exceptions import APIError

from viadot.signals import SKIP
from viadot.sources import AzureSQL
from viadot.utils import (
add_viadot_metadata_columns,
check_if_empty_file,
gen_bulk_insert_query_from_df,
check_value,
get_flow_last_run_date,
get_nested_value,
get_sql_server_table_dtypes,
slugify,
handle_api_response,
union_dict,
)

EMPTY_CSV_PATH = "empty.csv"
EMPTY_PARQUET_PATH = "empty.parquet"


class ClassForDecorator:
class ClassForMetadataDecorator:
source = "Source_name"

def __init__(self):
Expand All @@ -34,7 +41,47 @@ def to_df_decorated_parameter(self):
return self.df


def test_single_quotes_inside():
@pytest.fixture(scope="function")
def example_dataframe():
data = [(1, "_suffixnan", 1), (2, "Noneprefix", 0), (3, "fooNULLbar", 1, 2.34)]
return pd.DataFrame(data, columns=["id", "name", "is_deleted", "balance"])


@pytest.fixture(scope="function")
def azure_sql():
azure_sql = AzureSQL(config_key="AZURE_SQL")
yield azure_sql


@pytest.fixture(scope="function")
def nested_dict():
nested_dict = {
"first_known_lvl": {
"second_known_lvl": {
"third_known_lvl": {
"searched_lvl": {
"searched_phrase_1": "First value",
"searched_phrase_2": None,
"searched_phrase_3": "Found it!",
}
}
}
},
"first_known_lvl_2": {
"second_known_lvl_2": {"searched_phrase_2": "Found it_2!"}
},
}
return nested_dict


def test_slugify():
"""To test slugify() function functionalities work"""
test_string = "Text With Spaces Before Changes"
string_after_changes = slugify(test_string)
assert string_after_changes == "text_with_spaces_before_changes"


def test_bulk_insert_query_from_df_single_quotes_inside():
TEST_VALUE = "a'b"
df1 = pd.DataFrame(
{
Expand All @@ -56,7 +103,7 @@ def test_single_quotes_inside():
), test_insert_query


def test_single_quotes_outside():
def test_bulk_insert_query_from_df_single_quotes_outside():
TEST_VALUE = "'a'"
df1 = pd.DataFrame(
{
Expand All @@ -78,7 +125,7 @@ def test_single_quotes_outside():
), test_insert_query


def test_double_quotes_inside():
def test_bulk_insert_query_from_df_double_quotes_inside():
TEST_VALUE = 'a "b"'
df1 = pd.DataFrame(
{
Expand All @@ -100,6 +147,34 @@ def test_double_quotes_inside():
), test_insert_query


def test_bulk_insert_query_from_df_not_implemeted():
TEST_VALUE = 'a "b"'
df1 = pd.DataFrame({"a": [TEST_VALUE]})
with pytest.raises(
NotImplementedError,
match="this function only handles DataFrames with at least two columns.",
):
gen_bulk_insert_query_from_df(df1, table_fqn="test_schema.test_table")


def test_bulk_insert_query_from_df_full_return(example_dataframe):
result = gen_bulk_insert_query_from_df(
example_dataframe,
table_fqn="users",
chunksize=1000,
status="APPROVED",
address=None,
)

expected_result = """INSERT INTO users (id, name, is_deleted, balance, status, address)

VALUES (1, '_suffixnan', 1, NULL, 'APPROVED', NULL),
(2, 'Noneprefix', 0, NULL, 'APPROVED', NULL),
(3, 'fooNULLbar', 1, 2.34, 'APPROVED', NULL)"""

assert result == expected_result


def test_check_if_empty_file_csv(caplog):
with open(EMPTY_CSV_PATH, "w"):
pass
Expand Down Expand Up @@ -139,66 +214,129 @@ def test_check_if_empty_file_no_data(caplog):


def test_add_viadot_metadata_columns_base():
df_base = ClassForDecorator().to_df()
df_decorated = ClassForDecorator().to_df_decorated()
df_base = ClassForMetadataDecorator().to_df()
df_decorated = ClassForMetadataDecorator().to_df_decorated()

assert df_base.columns.to_list() == ["a", "b"]
assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"]
assert df_decorated["_viadot_source"][0] == "ClassForDecorator"
assert df_decorated["_viadot_source"][0] == "ClassForMetadataDecorator"


def test_add_viadot_metadata_columns_with_parameter():
df_base = ClassForDecorator().to_df()
df_decorated = ClassForDecorator().to_df_decorated_parameter()
df_base = ClassForMetadataDecorator().to_df()
df_decorated = ClassForMetadataDecorator().to_df_decorated_parameter()

assert df_base.columns.to_list() == ["a", "b"]
assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"]
assert df_decorated["_viadot_source"][0] == "Source_name"


# Sample test checking the correctness of the function when the key is found
def test_check_value_found():
json_data = {
"first_known_lvl": {
"second_known_lvl": {"third_known_lvl": {"searched_phrase": "phrase"}}
}
}
result = check_value(
json_data["first_known_lvl"]["second_known_lvl"]["third_known_lvl"],
["searched_phrase"],
def test_handle_api_response_wrong_method():
"""Test to check if ValueError is thrown when wrong method is used."""

api_url = "https://api.api-ninjas.com/v1/randomuser"
with pytest.raises(ValueError, match="Method not found."):
handle_api_response(url=api_url, method="WRONG_METHOD")


def test_handle_api_response_credentials_not_provided():
"""Test to check if APIError is thrown when credentials are not provided."""

api_url = "https://api.api-ninjas.com/v1/randomuser"
with pytest.raises(
APIError, match="Perhaps your account credentials need to be refreshed?"
):
handle_api_response(url=api_url)


def test_handle_api_response_wrong_url():
"""Test to check if APIError is thrown when api_url is wrong."""

api_url = "https://test.com/"
with pytest.raises(APIError, match="failed due to connection issues."):
handle_api_response(url=api_url)


def test_handle_api_response_unknown_error():
"""Test to check if APIError is thrown when there is something other than "url" under api_url."""

api_url = "test_string"
with pytest.raises(APIError, match="Unknown error"):
handle_api_response(url=api_url)


def test_handle_api_response_return_type():
"""Test to check if the connection is successful."""

api_url = "https://jsonplaceholder.typicode.com/posts"
response = handle_api_response(url=api_url)
assert response.status_code == 200


def test_get_sql_server_table_dtypes(azure_sql):
"""Checks if dtypes is generated in a good way using `get_sql_server_table_dtypes` function."""

SCHEMA = "sandbox"
TABLE = "test_table_dtypes"
dtypes = {"country": "VARCHAR(100)", "sales": "INT"}

azure_sql.create_table(
schema=SCHEMA, table=TABLE, dtypes=dtypes, if_exists="replace"
)
assert result == "phrase"

dtypes = get_sql_server_table_dtypes(schema=SCHEMA, table=TABLE, con=azure_sql.con)
assert isinstance(dtypes, dict)
assert list(dtypes.keys()) == ["country", "sales"]
assert list(dtypes.values()) == ["varchar(100)", "int"]

# Sample test checking the correctness of the function when the key is not found
def test_check_value_not_found():
json_data = {
"first_known_lvl": {
"second_known_lvl": {
"third_known_lvl": {"other_phrase": "This won't be found"}
}
}
}
result = check_value(
json_data["first_known_lvl"]["second_known_lvl"]["third_known_lvl"],
["searched_phrase"],

def test_union_dict_return():
"""Check if dictionaries are unioned in the correct way."""
a = {"a": 1}
b = {"b": 2}
unioned_dict = union_dict(a, b)
assert isinstance(unioned_dict, dict)
assert unioned_dict == {"a": 1, "b": 2}


def test_get_nested_value_found(nested_dict):
"""Sample test checking the correctness of the function when the key is found."""
result = get_nested_value(
nested_dict=nested_dict["first_known_lvl"]["second_known_lvl"][
"third_known_lvl"
],
levels_to_search=["searched_lvl", "searched_phrase_3"],
)
assert result is None
assert result == "Found it!"


# Sample test checking the correctness of the function with an empty dictionary
def test_check_value_empty_dict():
json_data = {}
result = check_value(json_data, ["searched_phrase"])
def test_get_nested_value_not_found(nested_dict):
"""Sample test checking the correctness of the function when the key is not found."""
result = get_nested_value(
nested_dict["first_known_lvl"]["second_known_lvl"]["third_known_lvl"],
levels_to_search=["searched_wrong_lvl"],
)
assert result is None


# Sample test checking the correctness of the function with a nonexistent key
def test_check_value_nonexistent_key():
json_data = {
"first_known_lvl": {
"second_known_lvl": {"third_known_lvl": {"searched_phrase": "phrase"}}
}
def test_get_nested_value_nested_dict_is_string(caplog):
"""Sample test checking the correctness of the function when non-dictionary value is provided as nested_dict."""
with caplog.at_level(logging.WARNING):
get_nested_value(
nested_dict="this_is_not_dict",
levels_to_search=["searched_phrase"],
)
assert "The 'nested_dict' must be a dictionary." in caplog.text


def test_get_nested_value_without_levels(nested_dict):
"""Sample test checking the correctness of the function when only `nested_value` is provided."""
result_1 = get_nested_value(nested_dict=nested_dict)
result_2 = get_nested_value(nested_dict=nested_dict["first_known_lvl_2"])

assert result_1 == {
"searched_phrase_1": "First value",
"searched_phrase_2": None,
"searched_phrase_3": "Found it!",
}
result = check_value(json_data, ["nonexistent_key"])
assert result is None
assert result_2 == {"searched_phrase_2": "Found it_2!"}
4 changes: 2 additions & 2 deletions viadot/sources/sharepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from office365.sharepoint.client_context import ClientContext
from prefect.utilities import logging

from viadot.utils import get_nested_dict
from viadot.utils import get_nested_value

from ..config import local_config
from ..exceptions import CredentialError
Expand Down Expand Up @@ -168,7 +168,7 @@ def _unpack_fields(
item_values_dict = list_item.properties
if item_values_dict:
for field, val in item_values_dict.items():
nested_dict = get_nested_dict(val)
nested_dict = get_nested_value(val)
# Check if the values are nested
if nested_dict != None:
# Check if field has expandable type
Expand Down
50 changes: 31 additions & 19 deletions viadot/tasks/genesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from viadot.exceptions import APIError
from viadot.sources import Genesys
from viadot.utils import check_value
from viadot.utils import get_nested_value
from viadot.task_utils import *

logger = logging.get_logger()
Expand Down Expand Up @@ -537,31 +537,43 @@ def run(
# For loop to extract data from specific page
for id in range(0, num_ids):
record_dict = {}
record_dict["Id"] = check_value(json_file["entities"][id], ["id"])
record_dict["Name"] = check_value(
json_file["entities"][id], ["name"]
record_dict["Id"] = get_nested_value(
nested_dict=json_file["entities"][id], levels_to_search=["id"]
)
record_dict["DivisionName"] = check_value(
json_file["entities"][id], ["division", "name"]
record_dict["Name"] = get_nested_value(
nested_dict=json_file["entities"][id], levels_to_search=["name"]
)
record_dict["Email"] = check_value(
json_file["entities"][id], ["email"]
record_dict["DivisionName"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=["division", "name"],
)
record_dict["State"] = check_value(
json_file["entities"][id], ["state"]
record_dict["Email"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=["email"],
)
record_dict["Title"] = check_value(
json_file["entities"][id], ["title"]
record_dict["State"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=["state"],
)
record_dict["Username"] = check_value(
json_file["entities"][id], ["username"]
record_dict["Title"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=["title"],
)
record_dict["SystemPresence"] = check_value(
json_file["entities"][id],
["presence", "presenceDefinition", "systemPresence"],
record_dict["Username"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=["username"],
)
record_dict["DateLastLogin"] = check_value(
json_file["entities"][id], ["dateLastLogin"]
record_dict["SystemPresence"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=[
"presence",
"presenceDefinition",
"systemPresence",
],
)
record_dict["DateLastLogin"] = get_nested_value(
nested_dict=json_file["entities"][id],
levels_to_search=["dateLastLogin"],
)

data_list.append(record_dict)
Expand Down
Loading
Loading