diff --git a/.gitignore b/.gitignore index f2bed40d8..58e12c8ad 100644 --- a/.gitignore +++ b/.gitignore @@ -187,3 +187,6 @@ profiles.yaml # AWS .aws + +# Rye +.rye/ diff --git a/pyproject.toml b/pyproject.toml index d67a552bb..1daacd948 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "viadot2" -version = "2.1.24" +version = "2.1.26" description = "A simple data ingestion library to guide data flows from some places to other places." authors = [ { name = "acivitillo", email = "acivitillo@dyvenia.com" }, @@ -23,7 +23,6 @@ dependencies = [ "openpyxl>=3.0.0", "prefect>=2.19.7, <3", "prefect-sqlalchemy>=0.4.3", - "pandas>=1.2.0", "duckdb==1.0.0", "requests>=2.32.3", "prefect-github>=0.2.7", @@ -37,6 +36,8 @@ dependencies = [ "simple-salesforce==1.12.6", "pandas-gbq==0.23.1", "paramiko>=3.5.0", + # awswrangler 2.x. depends on pandas 1.x. + "pandas<2.0", "TM1py==1.11.3", ] requires-python = ">=3.10" @@ -58,10 +59,11 @@ aws = [ "boto3==1.34.106", "dbt-redshift>=1.3, <1.8", "minio>=7.0, <8.0", + # We need to pin this to 2.x because some code depends on 2.x-only functionality. "awswrangler>=2.20.1, <3.0", "prefect-aws>=0.4.19", ] -sap = ["pyrfc"] +sap = ["pyrfc==3.3.1"] [tool.rye] managed = true diff --git a/requirements-dev.lock b/requirements-dev.lock index 1d1a7d78f..f3a89a283 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -52,9 +52,9 @@ beautifulsoup4==4.12.3 # via o365 bleach==6.1.0 # via nbconvert -boto3==1.35.1 +boto3==1.34.106 # via moto -botocore==1.35.1 +botocore==1.34.162 # via boto3 # via moto # via s3transfer @@ -148,7 +148,7 @@ fastjsonschema==2.20.0 frozenlist==1.4.1 # via aiohttp # via aiosignal -fsspec==2024.6.1 +fsspec==2024.6.0 # via prefect ghp-import==2.1.0 # via mkdocs @@ -194,7 +194,7 @@ griffe==0.47.0 grpcio==1.66.1 # via google-api-core # via grpcio-status -grpcio-status==1.66.1 +grpcio-status==1.62.3 # via google-api-core h11==0.14.0 # via httpcore @@ -224,7 +224,7 @@ ijson==3.3.0 # via tm1py imagehash==4.3.1 # via viadot2 -importlib-metadata==8.3.0 +importlib-metadata==6.11.0 # via mike importlib-resources==6.1.3 # via mike @@ -377,7 +377,7 @@ nest-asyncio==1.6.0 # via ipykernel networkx==3.3 # via visions -numpy==1.26.4 +numpy==1.23.4 # via db-dtypes # via imagehash # via pandas @@ -393,7 +393,7 @@ o365==2.0.36 oauthlib==3.2.2 # via kubernetes # via requests-oauthlib -openpyxl==3.1.5 +openpyxl==3.0.10 # via viadot2 orjson==3.10.7 # via prefect @@ -409,7 +409,7 @@ packaging==24.1 # via pytest paginate==0.5.6 # via mkdocs-material -pandas==2.2.2 +pandas==1.5.1 # via db-dtypes # via mkdocs-table-reader-plugin # via pandas-gbq @@ -423,7 +423,7 @@ paramiko==3.5.0 # via viadot2 parso==0.8.4 # via jedi -pathspec==0.12.1 +pathspec==0.11.2 # via mkdocs # via prefect pendulum==2.1.2 @@ -453,7 +453,7 @@ prompt-toolkit==3.0.47 # via ipython proto-plus==1.24.0 # via google-api-core -protobuf==5.28.2 +protobuf==4.25.5 # via google-api-core # via googleapis-common-protos # via grpcio-status @@ -623,7 +623,7 @@ ruamel-yaml-clib==0.2.8 ruff==0.6.7 s3transfer==0.10.2 # via boto3 -scipy==1.14.0 +scipy==1.13.1 # via imagehash sendgrid==6.11.0 # via viadot2 @@ -673,6 +673,8 @@ starkbank-ecdsa==2.2.0 # via sendgrid tabulate==0.9.0 # via mkdocs-table-reader-plugin +tangled-up-in-unicode==0.2.0 + # via visions text-unidecode==1.3 # via python-slugify tinycss2==1.3.0 @@ -719,14 +721,13 @@ typing-extensions==4.12.2 # via uvicorn tzdata==2024.1 # via o365 - # via pandas tzlocal==5.2 # via dateparser # via o365 # via trino ujson==5.10.0 # via prefect -urllib3==2.2.2 +urllib3==1.26.20 # via botocore # via docker # via kubernetes @@ -736,7 +737,7 @@ uvicorn==0.30.6 # via prefect verspec==0.1.0 # via mike -visions==0.7.6 +visions==0.7.5 # via viadot2 watchdog==4.0.2 # via mkdocs diff --git a/requirements.lock b/requirements.lock index d8021c8f6..0ca1a46d4 100644 --- a/requirements.lock +++ b/requirements.lock @@ -98,7 +98,7 @@ exceptiongroup==1.2.2 frozenlist==1.4.1 # via aiohttp # via aiosignal -fsspec==2024.6.1 +fsspec==2024.6.0 # via prefect google-api-core==2.20.0 # via google-cloud-bigquery @@ -137,7 +137,7 @@ griffe==0.47.0 grpcio==1.66.1 # via google-api-core # via grpcio-status -grpcio-status==1.66.1 +grpcio-status==1.62.3 # via google-api-core h11==0.14.0 # via httpcore @@ -213,7 +213,7 @@ multimethod==1.12 # via visions networkx==3.3 # via visions -numpy==1.26.4 +numpy==1.23.4 # via db-dtypes # via imagehash # via pandas @@ -229,7 +229,7 @@ o365==2.0.36 oauthlib==3.2.2 # via kubernetes # via requests-oauthlib -openpyxl==3.1.5 +openpyxl==3.0.10 # via viadot2 orjson==3.10.7 # via prefect @@ -238,7 +238,7 @@ packaging==24.1 # via google-cloud-bigquery # via pandas-gbq # via prefect -pandas==2.2.2 +pandas==1.5.1 # via db-dtypes # via pandas-gbq # via viadot2 @@ -247,7 +247,7 @@ pandas-gbq==0.23.1 # via viadot2 paramiko==3.5.0 # via viadot2 -pathspec==0.12.1 +pathspec==0.11.2 # via prefect pendulum==2.1.2 # via prefect @@ -265,7 +265,7 @@ prefect-sqlalchemy==0.4.4 # via viadot2 proto-plus==1.24.0 # via google-api-core -protobuf==5.28.2 +protobuf==4.25.5 # via google-api-core # via googleapis-common-protos # via grpcio-status @@ -382,7 +382,7 @@ ruamel-yaml==0.18.6 # via prefect ruamel-yaml-clib==0.2.8 # via ruamel-yaml -scipy==1.14.0 +scipy==1.13.1 # via imagehash sendgrid==6.11.0 # via viadot2 @@ -422,6 +422,8 @@ sqlparse==0.5.1 # via sql-metadata starkbank-ecdsa==2.2.0 # via sendgrid +tangled-up-in-unicode==0.2.0 + # via visions text-unidecode==1.3 # via python-slugify tm1py==1.11.3 @@ -446,20 +448,19 @@ typing-extensions==4.12.2 # via uvicorn tzdata==2024.1 # via o365 - # via pandas tzlocal==5.2 # via dateparser # via o365 # via trino ujson==5.10.0 # via prefect -urllib3==2.2.2 +urllib3==1.26.20 # via docker # via kubernetes # via requests uvicorn==0.30.6 # via prefect -visions==0.7.6 +visions==0.7.5 # via viadot2 websocket-client==1.8.0 # via kubernetes diff --git a/src/viadot/sources/sap_rfc.py b/src/viadot/sources/sap_rfc.py index cf2ab2b23..6c089c868 100755 --- a/src/viadot/sources/sap_rfc.py +++ b/src/viadot/sources/sap_rfc.py @@ -714,6 +714,7 @@ def __init__( if rfc_unique_id is not None: self.rfc_unique_id = list(set(rfc_unique_id)) + self._rfc_unique_id_len = {} else: self.rfc_unique_id = rfc_unique_id @@ -966,24 +967,24 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091 col_length_total = 0 if isinstance(self.rfc_unique_id[0], str): character_limit = self.rfc_total_col_width_character_limit - for ref_column in self.rfc_unique_id: - col_length_reference_column = int( + for rfc_unique_col in self.rfc_unique_id: + rfc_unique_col_len = int( self.call( "DDIF_FIELDINFO_GET", TABNAME=table_name, - FIELDNAME=ref_column, + FIELDNAME=rfc_unique_col, )["DFIES_TAB"][0]["LENG"] ) - if col_length_reference_column > int( + if rfc_unique_col_len > int( self.rfc_total_col_width_character_limit / 4 ): - msg = f"{ref_column} can't be used as unique column, too large." + msg = f"{rfc_unique_col} can't be used as unique column, too large." raise ValueError(msg) local_limit = ( - self.rfc_total_col_width_character_limit - - col_length_reference_column + self.rfc_total_col_width_character_limit - rfc_unique_col_len ) character_limit = min(local_limit, character_limit) + self._rfc_unique_id_len[rfc_unique_col] = rfc_unique_col_len else: character_limit = self.rfc_total_col_width_character_limit @@ -995,21 +996,21 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091 cols.append(col) else: if isinstance(self.rfc_unique_id[0], str) and all( - rfc_col not in cols for rfc_col in self.rfc_unique_id + rfc_unique_col not in cols for rfc_unique_col in self.rfc_unique_id ): - for rfc_col in self.rfc_unique_id: - if rfc_col not in cols: - cols.append(rfc_col) + for rfc_unique_col in self.rfc_unique_id: + if rfc_unique_col not in cols: + cols.append(rfc_unique_col) lists_of_columns.append(cols) cols = [col] col_length_total = int(col_length) if isinstance(self.rfc_unique_id[0], str) and all( - rfc_col not in cols for rfc_col in self.rfc_unique_id + rfc_unique_col not in cols for rfc_col in self.rfc_unique_id ): - for rfc_col in self.rfc_unique_id: - if rfc_col not in cols: - cols.append(rfc_col) + for rfc_unique_col in self.rfc_unique_id: + if rfc_unique_col not in cols: + cols.append(rfc_unique_col) lists_of_columns.append(cols) columns = lists_of_columns @@ -1040,6 +1041,30 @@ def _get_alias(self, column: str) -> str: def _get_client_side_filter_cols(self): return [f[1].split()[0] for f in self.client_side_filters.items()] + def _adjust_whitespaces(self, df: pd.DataFrame) -> pd.DataFrame: + """Adjust the number of whitespaces. + + Add whitespace characters in each row of each unique column to achieve + equal length of values in these columns, ensuring proper merging of subqueries. + + """ + for rfc_unique_col in self.rfc_unique_id: + # Check in SAP metadata what is the declared + # dtype characters amount + rfc_unique_column_len = self._rfc_unique_id_len[rfc_unique_col] + actual_length_of_field = df[rfc_unique_col].str.len() + # Check which rows have fewer characters + # than specified in the column data type. + rows_missing_whitespaces = actual_length_of_field < rfc_unique_column_len + if any(rows_missing_whitespaces): + # Check how many whitespaces are missing in each row. + logger.info(f"Adding whitespaces for {rfc_unique_col} column") + n_missing_whitespaces = rfc_unique_column_len - actual_length_of_field + df.loc[rows_missing_whitespaces, rfc_unique_col] += np.char.multiply( + " ", n_missing_whitespaces[rows_missing_whitespaces] + ) + return df + # TODO: refactor to remove linter warnings and so this can be tested. @add_viadot_metadata_columns def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR0912, PLR0915 @@ -1117,7 +1142,6 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09 record_key = "WA" data_raw = np.array(response["DATA"]) del response - # If reference columns are provided, it's not necessary to remove # any extra row. if not isinstance(self.rfc_unique_id[0], str): @@ -1126,22 +1150,15 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09 ) else: start = False - records = list(_gen_split(data_raw, sep, record_key)) del data_raw - if ( isinstance(self.rfc_unique_id[0], str) and list(df.columns) != fields ): df_tmp = pd.DataFrame(columns=fields) df_tmp[fields] = records - # SAP adds whitespaces to the first extracted column value. - # If whitespace is in unique column, it must be removed to make - # a proper merge. - for col in self.rfc_unique_id: - df_tmp[col] = df_tmp[col].str.strip() - df[col] = df[col].str.strip() + df_tmp = self._adjust_whitespaces(df_tmp) df = pd.merge(df, df_tmp, on=self.rfc_unique_id, how="outer") elif not start: df[fields] = records diff --git a/src/viadot/utils.py b/src/viadot/utils.py index d27378c75..ed4885680 100644 --- a/src/viadot/utils.py +++ b/src/viadot/utils.py @@ -1016,7 +1016,7 @@ def df_converts_bytes_to_int(df: pd.DataFrame) -> pd.DataFrame: Returns: pd.DataFrame: Data Frame after convert """ - return df.map(lambda x: int(x) if isinstance(x, bytes) else x) + return df.applymap(lambda x: int(x) if isinstance(x, bytes) else x) def df_clean_column( diff --git a/tests/unit/test_sap_rfc_2.py b/tests/unit/test_sap_rfc_2.py index 645e3ee8a..c500403a3 100644 --- a/tests/unit/test_sap_rfc_2.py +++ b/tests/unit/test_sap_rfc_2.py @@ -1,5 +1,7 @@ from collections import OrderedDict +from pandas import DataFrame + from viadot.utils import skip_test_on_missing_extra from .test_sap_rfc import ( @@ -104,3 +106,14 @@ def test___build_pandas_filter_query(): sap._build_pandas_filter_query(sap.client_side_filters) == "thirdlongcolname == 01234" ), sap._build_pandas_filter_query(sap.client_side_filters) + + +def test__adjust_whitespaces(): + sap.rfc_unique_id = ["column1", "column2"] + sap._rfc_unique_id_len = {"column1": 5, "column2": 4} + data = {"column1": ["xyz ", "oiu"], "column2": ["yrt ", "lkj"]} + df = DataFrame(data) + df = sap._adjust_whitespaces(df) + col_values_len = df.applymap(lambda x: len(x)) + check_if_length_match = col_values_len == sap._rfc_unique_id_len.values() + assert check_if_length_match.all().all() diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 2b2b716e6..245859cc7 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -125,23 +125,23 @@ def test__cast_df_cols(): "bool_column": [True, False, True, False], "datetime_column": [ "2023-05-25 10:30:00", - "2023-05-20 ", - "2023-05-15 10:30", - "2023-05-10 10:30:00+00:00 ", + "2023-05-20 10:00:00", + "2023-05-15 10:30:00", + "2023-05-10 10:30:00", ], "int_column": [5, 10, 15, 20], "object_column": ["apple", "banana", "melon", "orange"], } ) test_df["datetime_column"] = pd.to_datetime( - test_df["datetime_column"], format="mixed" + test_df["datetime_column"], infer_datetime_format=True ) result_df = _cast_df_cols( test_df, types_to_convert=["datetime", "bool", "int", "object"] ) assert result_df["bool_column"].dtype == pd.Int64Dtype() - assert result_df["datetime_column"].dtype == pd.StringDtype() + assert pd.api.types.is_object_dtype(result_df["datetime_column"]) assert result_df["int_column"].dtype == pd.Int64Dtype() assert result_df["object_column"].dtype == pd.StringDtype()