diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index c29fbc014..21517ec8e 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -3,12 +3,20 @@ import pandas as pd import pytest +from viadot.exceptions import APIError from viadot.signals import SKIP +from viadot.sources import AzureSQL from viadot.utils import ( add_viadot_metadata_columns, check_if_empty_file, - check_value, + gen_bulk_insert_query_from_df, + get_flow_last_run_date, + get_nested_value, + get_sql_server_table_dtypes, + slugify, + handle_api_response, + union_dict, gen_bulk_insert_query_from_df, ) @@ -16,7 +24,7 @@ EMPTY_PARQUET_PATH = "empty.parquet" -class ClassForDecorator: +class ClassForMetadataDecorator: source = "Source_name" def __init__(self): @@ -34,7 +42,47 @@ def to_df_decorated_parameter(self): return self.df -def test_single_quotes_inside(): +@pytest.fixture(scope="function") +def example_dataframe(): + data = [(1, "_suffixnan", 1), (2, "Noneprefix", 0), (3, "fooNULLbar", 1, 2.34)] + return pd.DataFrame(data, columns=["id", "name", "is_deleted", "balance"]) + + +@pytest.fixture(scope="function") +def azure_sql(): + azure_sql = AzureSQL(config_key="AZURE_SQL") + yield azure_sql + + +@pytest.fixture(scope="function") +def nested_dict(): + nested_dict = { + "first_known_lvl": { + "second_known_lvl": { + "third_known_lvl": { + "searched_lvl": { + "searched_phrase_1": "First value", + "searched_phrase_2": None, + "searched_phrase_3": "Found it!", + } + } + } + }, + "first_known_lvl_2": { + "second_known_lvl_2": {"searched_phrase_2": "Found it_2!"} + }, + } + return nested_dict + + +def test_slugify(): + """To test slugify() function functionalities work""" + test_string = "Text With Spaces Before Changes" + string_after_changes = slugify(test_string) + assert string_after_changes == "text_with_spaces_before_changes" + + +def test_bulk_insert_query_from_df_single_quotes_inside(): TEST_VALUE = "a'b" df1 = pd.DataFrame( { @@ -56,7 +104,7 @@ def test_single_quotes_inside(): ), test_insert_query -def test_single_quotes_outside(): +def test_bulk_insert_query_from_df_single_quotes_outside(): TEST_VALUE = "'a'" df1 = pd.DataFrame( { @@ -78,7 +126,7 @@ def test_single_quotes_outside(): ), test_insert_query -def test_double_quotes_inside(): +def test_bulk_insert_query_from_df_double_quotes_inside(): TEST_VALUE = 'a "b"' df1 = pd.DataFrame( { @@ -100,6 +148,34 @@ def test_double_quotes_inside(): ), test_insert_query +def test_bulk_insert_query_from_df_not_implemeted(): + TEST_VALUE = 'a "b"' + df1 = pd.DataFrame({"a": [TEST_VALUE]}) + with pytest.raises( + NotImplementedError, + match="this function only handles DataFrames with at least two columns.", + ): + gen_bulk_insert_query_from_df(df1, table_fqn="test_schema.test_table") + + +def test_bulk_insert_query_from_df_full_return(example_dataframe): + result = gen_bulk_insert_query_from_df( + example_dataframe, + table_fqn="users", + chunksize=1000, + status="APPROVED", + address=None, + ) + + expected_result = """INSERT INTO users (id, name, is_deleted, balance, status, address) + +VALUES (1, '_suffixnan', 1, NULL, 'APPROVED', NULL), + (2, 'Noneprefix', 0, NULL, 'APPROVED', NULL), + (3, 'fooNULLbar', 1, 2.34, 'APPROVED', NULL)""" + + assert result == expected_result + + def test_check_if_empty_file_csv(caplog): with open(EMPTY_CSV_PATH, "w"): pass @@ -139,66 +215,129 @@ def test_check_if_empty_file_no_data(caplog): def test_add_viadot_metadata_columns_base(): - df_base = ClassForDecorator().to_df() - df_decorated = ClassForDecorator().to_df_decorated() + df_base = ClassForMetadataDecorator().to_df() + df_decorated = ClassForMetadataDecorator().to_df_decorated() assert df_base.columns.to_list() == ["a", "b"] assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"] - assert df_decorated["_viadot_source"][0] == "ClassForDecorator" + assert df_decorated["_viadot_source"][0] == "ClassForMetadataDecorator" def test_add_viadot_metadata_columns_with_parameter(): - df_base = ClassForDecorator().to_df() - df_decorated = ClassForDecorator().to_df_decorated_parameter() + df_base = ClassForMetadataDecorator().to_df() + df_decorated = ClassForMetadataDecorator().to_df_decorated_parameter() assert df_base.columns.to_list() == ["a", "b"] assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"] assert df_decorated["_viadot_source"][0] == "Source_name" -# Sample test checking the correctness of the function when the key is found -def test_check_value_found(): - json_data = { - "first_known_lvl": { - "second_known_lvl": {"third_known_lvl": {"searched_phrase": "phrase"}} - } - } - result = check_value( - json_data["first_known_lvl"]["second_known_lvl"]["third_known_lvl"], - ["searched_phrase"], +def test_handle_api_response_wrong_method(): + """Test to check if ValueError is thrown when wrong method is used.""" + + api_url = "https://api.api-ninjas.com/v1/randomuser" + with pytest.raises(ValueError, match="Method not found."): + handle_api_response(url=api_url, method="WRONG_METHOD") + + +def test_handle_api_response_credentials_not_provided(): + """Test to check if APIError is thrown when credentials are not provided.""" + + api_url = "https://api.api-ninjas.com/v1/randomuser" + with pytest.raises( + APIError, match="Perhaps your account credentials need to be refreshed?" + ): + handle_api_response(url=api_url) + + +def test_handle_api_response_wrong_url(): + """Test to check if APIError is thrown when api_url is wrong.""" + + api_url = "https://test.com/" + with pytest.raises(APIError, match="failed due to connection issues."): + handle_api_response(url=api_url) + + +def test_handle_api_response_unknown_error(): + """Test to check if APIError is thrown when there is something other than "url" under api_url.""" + + api_url = "test_string" + with pytest.raises(APIError, match="Unknown error"): + handle_api_response(url=api_url) + + +def test_handle_api_response_return_type(): + """Test to check if the connection is successful.""" + + api_url = "https://jsonplaceholder.typicode.com/posts" + response = handle_api_response(url=api_url) + assert response.status_code == 200 + + +def test_get_sql_server_table_dtypes(azure_sql): + """Checks if dtypes is generated in a good way using `get_sql_server_table_dtypes` function.""" + + SCHEMA = "sandbox" + TABLE = "test_table_dtypes" + dtypes = {"country": "VARCHAR(100)", "sales": "INT"} + + azure_sql.create_table( + schema=SCHEMA, table=TABLE, dtypes=dtypes, if_exists="replace" ) - assert result == "phrase" + dtypes = get_sql_server_table_dtypes(schema=SCHEMA, table=TABLE, con=azure_sql.con) + assert isinstance(dtypes, dict) + assert list(dtypes.keys()) == ["country", "sales"] + assert list(dtypes.values()) == ["varchar(100)", "int"] -# Sample test checking the correctness of the function when the key is not found -def test_check_value_not_found(): - json_data = { - "first_known_lvl": { - "second_known_lvl": { - "third_known_lvl": {"other_phrase": "This won't be found"} - } - } - } - result = check_value( - json_data["first_known_lvl"]["second_known_lvl"]["third_known_lvl"], - ["searched_phrase"], + +def test_union_dict_return(): + """Check if dictionaries are unioned in the correct way.""" + a = {"a": 1} + b = {"b": 2} + unioned_dict = union_dict(a, b) + assert isinstance(unioned_dict, dict) + assert unioned_dict == {"a": 1, "b": 2} + + +def test_get_nested_value_found(nested_dict): + """Sample test checking the correctness of the function when the key is found.""" + result = get_nested_value( + nested_dict=nested_dict["first_known_lvl"]["second_known_lvl"][ + "third_known_lvl" + ], + levels_to_search=["searched_lvl", "searched_phrase_3"], ) - assert result is None + assert result == "Found it!" -# Sample test checking the correctness of the function with an empty dictionary -def test_check_value_empty_dict(): - json_data = {} - result = check_value(json_data, ["searched_phrase"]) +def test_get_nested_value_not_found(nested_dict): + """Sample test checking the correctness of the function when the key is not found.""" + result = get_nested_value( + nested_dict["first_known_lvl"]["second_known_lvl"]["third_known_lvl"], + levels_to_search=["searched_wrong_lvl"], + ) assert result is None -# Sample test checking the correctness of the function with a nonexistent key -def test_check_value_nonexistent_key(): - json_data = { - "first_known_lvl": { - "second_known_lvl": {"third_known_lvl": {"searched_phrase": "phrase"}} - } +def test_get_nested_value_nested_dict_is_string(caplog): + """Sample test checking the correctness of the function when non-dictionary value is provided as nested_dict.""" + with caplog.at_level(logging.WARNING): + get_nested_value( + nested_dict="this_is_not_dict", + levels_to_search=["searched_phrase"], + ) + assert "The 'nested_dict' must be a dictionary." in caplog.text + + +def test_get_nested_value_without_levels(nested_dict): + """Sample test checking the correctness of the function when only `nested_value` is provided.""" + result_1 = get_nested_value(nested_dict=nested_dict) + result_2 = get_nested_value(nested_dict=nested_dict["first_known_lvl_2"]) + + assert result_1 == { + "searched_phrase_1": "First value", + "searched_phrase_2": None, + "searched_phrase_3": "Found it!", } - result = check_value(json_data, ["nonexistent_key"]) - assert result is None + assert result_2 == {"searched_phrase_2": "Found it_2!"} diff --git a/viadot/sources/sharepoint.py b/viadot/sources/sharepoint.py index fbbd1b08b..08e616326 100644 --- a/viadot/sources/sharepoint.py +++ b/viadot/sources/sharepoint.py @@ -10,7 +10,7 @@ from office365.sharepoint.client_context import ClientContext from prefect.utilities import logging -from viadot.utils import get_nested_dict +from viadot.utils import get_nested_value from ..config import local_config from ..exceptions import CredentialError @@ -168,7 +168,7 @@ def _unpack_fields( item_values_dict = list_item.properties if item_values_dict: for field, val in item_values_dict.items(): - nested_dict = get_nested_dict(val) + nested_dict = get_nested_value(val) # Check if the values are nested if nested_dict != None: # Check if field has expandable type diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index feafbaccf..04d4bc8b1 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -13,8 +13,8 @@ from viadot.exceptions import APIError from viadot.sources import Genesys +from viadot.utils import get_nested_value from viadot.task_utils import * -from viadot.utils import check_value logger = logging.get_logger() @@ -590,31 +590,43 @@ def run( # For loop to extract data from specific page for id in range(0, num_ids): record_dict = {} - record_dict["Id"] = check_value(json_file["entities"][id], ["id"]) - record_dict["Name"] = check_value( - json_file["entities"][id], ["name"] + record_dict["Id"] = get_nested_value( + nested_dict=json_file["entities"][id], levels_to_search=["id"] ) - record_dict["DivisionName"] = check_value( - json_file["entities"][id], ["division", "name"] + record_dict["Name"] = get_nested_value( + nested_dict=json_file["entities"][id], levels_to_search=["name"] ) - record_dict["Email"] = check_value( - json_file["entities"][id], ["email"] + record_dict["DivisionName"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=["division", "name"], ) - record_dict["State"] = check_value( - json_file["entities"][id], ["state"] + record_dict["Email"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=["email"], ) - record_dict["Title"] = check_value( - json_file["entities"][id], ["title"] + record_dict["State"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=["state"], ) - record_dict["Username"] = check_value( - json_file["entities"][id], ["username"] + record_dict["Title"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=["title"], ) - record_dict["SystemPresence"] = check_value( - json_file["entities"][id], - ["presence", "presenceDefinition", "systemPresence"], + record_dict["Username"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=["username"], ) - record_dict["DateLastLogin"] = check_value( - json_file["entities"][id], ["dateLastLogin"] + record_dict["SystemPresence"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=[ + "presence", + "presenceDefinition", + "systemPresence", + ], + ) + record_dict["DateLastLogin"] = get_nested_value( + nested_dict=json_file["entities"][id], + levels_to_search=["dateLastLogin"], ) data_list.append(record_dict) diff --git a/viadot/utils.py b/viadot/utils.py index cd34adb8a..654a408d1 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -23,6 +23,14 @@ def slugify(name: str) -> str: + """Function to change spaces to underscores and convert all characters to lowercase. + + Args: + name (str): String to convert. + + Returns: + str: Output text after conversion. + """ return name.replace(" ", "_").lower() @@ -137,12 +145,12 @@ def get_flow_last_run_date(flow_name: str) -> str: def get_sql_server_table_dtypes( - table, con: pyodbc.Connection, schema: str = None + table: str, con: pyodbc.Connection, schema: str = None ) -> dict: """Get column names and types from a SQL Server database table. Args: - table (_type_): The table for which to fetch dtypes. + table (str): The table for which to fetch dtypes. con (pyodbc.Connection): The connection to the database where the table is located. schema (str, optional): The schema where the table is located. Defaults to None. @@ -257,7 +265,7 @@ def build_merge_query( def gen_bulk_insert_query_from_df( - df: pd.DataFrame, table_fqn: str, chunksize=1000, **kwargs + df: pd.DataFrame, table_fqn: str, chunksize: int = 1000, **kwargs ) -> str: """ Converts a DataFrame to a bulk INSERT query. @@ -265,6 +273,7 @@ def gen_bulk_insert_query_from_df( Args: df (pd.DataFrame): The DataFrame which data should be put into the INSERT query. table_fqn (str): The fully qualified name (schema.table) of the table to be inserted into. + chunksize (int, optional): The size of chunk. Defaults to 1000. Returns: str: A bulk insert query that will insert all data from `df` into `table_fqn`. @@ -280,6 +289,7 @@ def gen_bulk_insert_query_from_df( >>> query = gen_bulk_insert_query_from_df(df, "users", status="APPROVED", address=None) >>> print(query) INSERT INTO users (id, name, is_deleted, balance, status, address) + VALUES (1, '_suffixnan', 1, NULL, 'APPROVED', NULL), (2, 'Noneprefix', 0, NULL, 'APPROVED', NULL), (3, 'fooNULLbar', 1, 2.34, 'APPROVED', NULL); @@ -344,21 +354,21 @@ def _gen_insert_query_from_records(records: List[tuple]) -> str: return _gen_insert_query_from_records(tuples_escaped) -def union_dict(*dicts): +def union_dict(*dicts) -> dict: """ - Function that union list of dictionaries + Function that union list of dictionaries into a singe dictionary. Args: - dicts (List[Dict]): list of dictionaries with credentials. + *dicts: Variable number of dictionaries to be unioned. Returns: - Dict: A single dictionary createb by union method. + dict: A single dictionary containing the combined key-value pairs from all input dictionaries. Examples: >>> a = {"a":1} >>> b = {"b":2} - >>> union_credentials_dict(a ,b) + >>> union_dict(a ,b) {'a': 1, 'b': 2} """ @@ -451,37 +461,43 @@ def wrapper(*args, **kwargs) -> pd.DataFrame: return decorator -def get_nested_dict(d): - if isinstance(d, dict): - for lvl in d.values(): - if isinstance(lvl, dict): - return get_nested_dict(lvl) - else: - return d - else: - return None - - -def check_value(base: Union[Dict, Any], levels: List) -> Union[None, Any]: +def get_nested_value( + nested_dict: dict, + levels_to_search: List[str] = None, +) -> Union[None, Any]: """ - Task to extract data from nested json file if there is any under passed parameters. - Otherwise return None. + Retrieve a value from a nested dictionary based on specified levels if the `levels_to_search` are provided. + Retrieve a key:value pair of the first deepest pair if `levels_to_search` is not provided. Args: - base (Dict, Any): variable with base lvl of the json, for example: - json_file["first_known_lvl"]["second_known_lvl"]["third_known_lvl"] - levels (List): List of potential lower levels of nested json for data retrieval. For example: - ["first_lvl_below_base", "second_lvl_below_base", "searched_phrase"] + nested_dict (dict): The nested dictionary to search for the value. + levels_to_search (List[str], optional): List of keys representing the levels to search. Defaults to None. + If provided, the function will attempt to retrieve the value at the specified levels. + If not provided, the function will recursively search for the first non-dictionary value. Returns: - Union[None, Any]: Searched value for the lowest level, in example data under "searched_phrase" key. + Union[None, Any]: The searched value for the specified level or the first key:value pair when + first non-dictionary value found during recursive search. + Returns None if the nested_dict is not a dictionary or if the specified levels are not found. """ - - for lvl in levels: - if isinstance(base, dict): - base = base.get(lvl) - if base is None: - return None + try: + if levels_to_search is not None: + for lvl in levels_to_search: + if isinstance(nested_dict[lvl], dict): + return get_nested_value( + nested_dict=nested_dict[levels_to_search.pop(0)], + levels_to_search=levels_to_search, + ) + else: + return nested_dict[lvl] else: - return base - return base + for lvl in nested_dict.values(): + if isinstance(lvl, dict): + return get_nested_value(nested_dict=lvl) + else: + return nested_dict + except KeyError as e: + return None + except TypeError as e: + logger.error(f"The 'nested_dict' must be a dictionary. {e}") + return None