diff --git a/.gitignore b/.gitignore index f2bed40d8..58e12c8ad 100644 --- a/.gitignore +++ b/.gitignore @@ -187,3 +187,6 @@ profiles.yaml # AWS .aws + +# Rye +.rye/ diff --git a/pyproject.toml b/pyproject.toml index 62cedba52..0e99bea61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,6 @@ dependencies = [ "openpyxl>=3.0.0", "prefect>=2.19.7, <3", "prefect-sqlalchemy>=0.4.3", - "pandas>=1.2.0", "duckdb==1.0.0", "requests>=2.32.3", "prefect-github>=0.2.7", @@ -37,6 +36,7 @@ dependencies = [ "simple-salesforce==1.12.6", "pandas-gbq==0.23.1", "paramiko>=3.5.0", + "pandas>=2.2.3", ] requires-python = ">=3.10" readme = "README.md" @@ -60,7 +60,7 @@ aws = [ "awswrangler>=2.20.1, <3.0", "prefect-aws>=0.4.19", ] -sap = ["pyrfc"] +sap = ["pyrfc==3.3.1"] [tool.rye] managed = true diff --git a/requirements-dev.lock b/requirements-dev.lock index cafb4443b..5948dbce8 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -405,7 +405,7 @@ packaging==24.1 # via pytest paginate==0.5.6 # via mkdocs-material -pandas==2.2.2 +pandas==2.2.3 # via db-dtypes # via mkdocs-table-reader-plugin # via pandas-gbq diff --git a/requirements.lock b/requirements.lock index 5b5b76e9f..0a10a86bf 100644 --- a/requirements.lock +++ b/requirements.lock @@ -234,7 +234,7 @@ packaging==24.1 # via google-cloud-bigquery # via pandas-gbq # via prefect -pandas==2.2.2 +pandas==2.2.3 # via db-dtypes # via pandas-gbq # via viadot2 diff --git a/src/viadot/sources/sap_rfc.py b/src/viadot/sources/sap_rfc.py index cf2ab2b23..6c089c868 100755 --- a/src/viadot/sources/sap_rfc.py +++ b/src/viadot/sources/sap_rfc.py @@ -714,6 +714,7 @@ def __init__( if rfc_unique_id is not None: self.rfc_unique_id = list(set(rfc_unique_id)) + self._rfc_unique_id_len = {} else: self.rfc_unique_id = rfc_unique_id @@ -966,24 +967,24 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091 col_length_total = 0 if isinstance(self.rfc_unique_id[0], str): character_limit = self.rfc_total_col_width_character_limit - for ref_column in self.rfc_unique_id: - col_length_reference_column = int( + for rfc_unique_col in self.rfc_unique_id: + rfc_unique_col_len = int( self.call( "DDIF_FIELDINFO_GET", TABNAME=table_name, - FIELDNAME=ref_column, + FIELDNAME=rfc_unique_col, )["DFIES_TAB"][0]["LENG"] ) - if col_length_reference_column > int( + if rfc_unique_col_len > int( self.rfc_total_col_width_character_limit / 4 ): - msg = f"{ref_column} can't be used as unique column, too large." + msg = f"{rfc_unique_col} can't be used as unique column, too large." raise ValueError(msg) local_limit = ( - self.rfc_total_col_width_character_limit - - col_length_reference_column + self.rfc_total_col_width_character_limit - rfc_unique_col_len ) character_limit = min(local_limit, character_limit) + self._rfc_unique_id_len[rfc_unique_col] = rfc_unique_col_len else: character_limit = self.rfc_total_col_width_character_limit @@ -995,21 +996,21 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091 cols.append(col) else: if isinstance(self.rfc_unique_id[0], str) and all( - rfc_col not in cols for rfc_col in self.rfc_unique_id + rfc_unique_col not in cols for rfc_unique_col in self.rfc_unique_id ): - for rfc_col in self.rfc_unique_id: - if rfc_col not in cols: - cols.append(rfc_col) + for rfc_unique_col in self.rfc_unique_id: + if rfc_unique_col not in cols: + cols.append(rfc_unique_col) lists_of_columns.append(cols) cols = [col] col_length_total = int(col_length) if isinstance(self.rfc_unique_id[0], str) and all( - rfc_col not in cols for rfc_col in self.rfc_unique_id + rfc_unique_col not in cols for rfc_col in self.rfc_unique_id ): - for rfc_col in self.rfc_unique_id: - if rfc_col not in cols: - cols.append(rfc_col) + for rfc_unique_col in self.rfc_unique_id: + if rfc_unique_col not in cols: + cols.append(rfc_unique_col) lists_of_columns.append(cols) columns = lists_of_columns @@ -1040,6 +1041,30 @@ def _get_alias(self, column: str) -> str: def _get_client_side_filter_cols(self): return [f[1].split()[0] for f in self.client_side_filters.items()] + def _adjust_whitespaces(self, df: pd.DataFrame) -> pd.DataFrame: + """Adjust the number of whitespaces. + + Add whitespace characters in each row of each unique column to achieve + equal length of values in these columns, ensuring proper merging of subqueries. + + """ + for rfc_unique_col in self.rfc_unique_id: + # Check in SAP metadata what is the declared + # dtype characters amount + rfc_unique_column_len = self._rfc_unique_id_len[rfc_unique_col] + actual_length_of_field = df[rfc_unique_col].str.len() + # Check which rows have fewer characters + # than specified in the column data type. + rows_missing_whitespaces = actual_length_of_field < rfc_unique_column_len + if any(rows_missing_whitespaces): + # Check how many whitespaces are missing in each row. + logger.info(f"Adding whitespaces for {rfc_unique_col} column") + n_missing_whitespaces = rfc_unique_column_len - actual_length_of_field + df.loc[rows_missing_whitespaces, rfc_unique_col] += np.char.multiply( + " ", n_missing_whitespaces[rows_missing_whitespaces] + ) + return df + # TODO: refactor to remove linter warnings and so this can be tested. @add_viadot_metadata_columns def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR0912, PLR0915 @@ -1117,7 +1142,6 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09 record_key = "WA" data_raw = np.array(response["DATA"]) del response - # If reference columns are provided, it's not necessary to remove # any extra row. if not isinstance(self.rfc_unique_id[0], str): @@ -1126,22 +1150,15 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09 ) else: start = False - records = list(_gen_split(data_raw, sep, record_key)) del data_raw - if ( isinstance(self.rfc_unique_id[0], str) and list(df.columns) != fields ): df_tmp = pd.DataFrame(columns=fields) df_tmp[fields] = records - # SAP adds whitespaces to the first extracted column value. - # If whitespace is in unique column, it must be removed to make - # a proper merge. - for col in self.rfc_unique_id: - df_tmp[col] = df_tmp[col].str.strip() - df[col] = df[col].str.strip() + df_tmp = self._adjust_whitespaces(df_tmp) df = pd.merge(df, df_tmp, on=self.rfc_unique_id, how="outer") elif not start: df[fields] = records diff --git a/tests/unit/test_sap_rfc_2.py b/tests/unit/test_sap_rfc_2.py index 645e3ee8a..c500403a3 100644 --- a/tests/unit/test_sap_rfc_2.py +++ b/tests/unit/test_sap_rfc_2.py @@ -1,5 +1,7 @@ from collections import OrderedDict +from pandas import DataFrame + from viadot.utils import skip_test_on_missing_extra from .test_sap_rfc import ( @@ -104,3 +106,14 @@ def test___build_pandas_filter_query(): sap._build_pandas_filter_query(sap.client_side_filters) == "thirdlongcolname == 01234" ), sap._build_pandas_filter_query(sap.client_side_filters) + + +def test__adjust_whitespaces(): + sap.rfc_unique_id = ["column1", "column2"] + sap._rfc_unique_id_len = {"column1": 5, "column2": 4} + data = {"column1": ["xyz ", "oiu"], "column2": ["yrt ", "lkj"]} + df = DataFrame(data) + df = sap._adjust_whitespaces(df) + col_values_len = df.applymap(lambda x: len(x)) + check_if_length_match = col_values_len == sap._rfc_unique_id_len.values() + assert check_if_length_match.all().all()