Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 [BUGFIX] For SAPRFCV2 adding whitespaces to match sub - queries #1100

Merged
merged 20 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ecda25e
⚡️ adding whitespaces to match SAP data type characters length to bul…
marcinpurtak Oct 11, 2024
3faa39c
Changed for loop to vector approach
marcinpurtak Oct 23, 2024
171e8a4
🚧 Changed df to df_tmp in whitespace filler
marcinpurtak Oct 24, 2024
4f67506
Added whitespace logging
marcinpurtak Oct 25, 2024
135bafa
♻️ Refactored logging of the exception
marcinpurtak Nov 4, 2024
c5d270b
Merge branch '2.0' into saprfcv2-adding-whitespaces
marcinpurtak Nov 5, 2024
82ef729
Update src/viadot/sources/sap_rfc.py
marcinpurtak Nov 6, 2024
a1652b0
Update src/viadot/sources/sap_rfc.py
marcinpurtak Nov 6, 2024
0dfb6c6
Update src/viadot/sources/sap_rfc.py
marcinpurtak Nov 6, 2024
06a6a44
Update src/viadot/sources/sap_rfc.py
marcinpurtak Nov 6, 2024
491d280
🎨 Changed unique variables name
marcinpurtak Nov 15, 2024
f3d2187
🔥 Removed try except from whitespace len check
marcinpurtak Nov 15, 2024
1362d28
⚡️ Wrapped adding whitespaces in sub function
marcinpurtak Nov 19, 2024
328b6c4
📝 Comments speeling changed
marcinpurtak Nov 19, 2024
05031d1
✅ Added tests for _adjust_whitespaces sap function
marcinpurtak Nov 19, 2024
ea1367f
✨ updated .gitignore with rye
marcinpurtak Nov 19, 2024
d3c5ca0
⬆️ Update pandas requirement as some funcs depend on pandas >=2
trymzet Nov 21, 2024
72ffbd8
➕ Update sap extra requirements
trymzet Nov 21, 2024
e1241da
➖ Remove the non-required(?) Cython
trymzet Nov 21, 2024
a4bf597
Merge branch '2.0' into saprfcv2-adding-whitespaces
marcinpurtak Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,6 @@ profiles.yaml

# AWS
.aws

# Rye
.rye/
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies = [
"openpyxl>=3.0.0",
"prefect>=2.19.7, <3",
"prefect-sqlalchemy>=0.4.3",
"pandas>=1.2.0",
"duckdb==1.0.0",
"requests>=2.32.3",
"prefect-github>=0.2.7",
Expand All @@ -37,6 +36,7 @@ dependencies = [
"simple-salesforce==1.12.6",
"pandas-gbq==0.23.1",
"paramiko>=3.5.0",
"pandas>=2.2.3",
]
requires-python = ">=3.10"
readme = "README.md"
Expand All @@ -60,7 +60,7 @@ aws = [
"awswrangler>=2.20.1, <3.0",
"prefect-aws>=0.4.19",
]
sap = ["pyrfc"]
sap = ["pyrfc==3.3.1"]

[tool.rye]
managed = true
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ packaging==24.1
# via pytest
paginate==0.5.6
# via mkdocs-material
pandas==2.2.2
pandas==2.2.3
# via db-dtypes
# via mkdocs-table-reader-plugin
# via pandas-gbq
Expand Down
2 changes: 1 addition & 1 deletion requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ packaging==24.1
# via google-cloud-bigquery
# via pandas-gbq
# via prefect
pandas==2.2.2
pandas==2.2.3
# via db-dtypes
# via pandas-gbq
# via viadot2
Expand Down
65 changes: 41 additions & 24 deletions src/viadot/sources/sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ def __init__(

if rfc_unique_id is not None:
self.rfc_unique_id = list(set(rfc_unique_id))
self._rfc_unique_id_len = {}
else:
self.rfc_unique_id = rfc_unique_id

Expand Down Expand Up @@ -966,24 +967,24 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091
col_length_total = 0
if isinstance(self.rfc_unique_id[0], str):
character_limit = self.rfc_total_col_width_character_limit
for ref_column in self.rfc_unique_id:
col_length_reference_column = int(
for rfc_unique_col in self.rfc_unique_id:
rfc_unique_col_len = int(
self.call(
"DDIF_FIELDINFO_GET",
TABNAME=table_name,
FIELDNAME=ref_column,
FIELDNAME=rfc_unique_col,
)["DFIES_TAB"][0]["LENG"]
)
if col_length_reference_column > int(
if rfc_unique_col_len > int(
self.rfc_total_col_width_character_limit / 4
):
msg = f"{ref_column} can't be used as unique column, too large."
msg = f"{rfc_unique_col} can't be used as unique column, too large."
raise ValueError(msg)
local_limit = (
self.rfc_total_col_width_character_limit
- col_length_reference_column
self.rfc_total_col_width_character_limit - rfc_unique_col_len
)
character_limit = min(local_limit, character_limit)
self._rfc_unique_id_len[rfc_unique_col] = rfc_unique_col_len
else:
character_limit = self.rfc_total_col_width_character_limit

Expand All @@ -995,21 +996,21 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091
cols.append(col)
else:
if isinstance(self.rfc_unique_id[0], str) and all(
rfc_col not in cols for rfc_col in self.rfc_unique_id
rfc_unique_col not in cols for rfc_unique_col in self.rfc_unique_id
):
for rfc_col in self.rfc_unique_id:
if rfc_col not in cols:
cols.append(rfc_col)
for rfc_unique_col in self.rfc_unique_id:
if rfc_unique_col not in cols:
cols.append(rfc_unique_col)
lists_of_columns.append(cols)
cols = [col]
col_length_total = int(col_length)

if isinstance(self.rfc_unique_id[0], str) and all(
rfc_col not in cols for rfc_col in self.rfc_unique_id
rfc_unique_col not in cols for rfc_col in self.rfc_unique_id
):
for rfc_col in self.rfc_unique_id:
if rfc_col not in cols:
cols.append(rfc_col)
for rfc_unique_col in self.rfc_unique_id:
if rfc_unique_col not in cols:
cols.append(rfc_unique_col)
lists_of_columns.append(cols)

columns = lists_of_columns
Expand Down Expand Up @@ -1040,6 +1041,30 @@ def _get_alias(self, column: str) -> str:
def _get_client_side_filter_cols(self):
return [f[1].split()[0] for f in self.client_side_filters.items()]

def _adjust_whitespaces(self, df: pd.DataFrame) -> pd.DataFrame:
"""Adjust the number of whitespaces.

Add whitespace characters in each row of each unique column to achieve
equal length of values in these columns, ensuring proper merging of subqueries.

"""
for rfc_unique_col in self.rfc_unique_id:
# Check in SAP metadata what is the declared
# dtype characters amount
rfc_unique_column_len = self._rfc_unique_id_len[rfc_unique_col]
actual_length_of_field = df[rfc_unique_col].str.len()
# Check which rows have fewer characters
# than specified in the column data type.
rows_missing_whitespaces = actual_length_of_field < rfc_unique_column_len
if any(rows_missing_whitespaces):
# Check how many whitespaces are missing in each row.
logger.info(f"Adding whitespaces for {rfc_unique_col} column")
n_missing_whitespaces = rfc_unique_column_len - actual_length_of_field
df.loc[rows_missing_whitespaces, rfc_unique_col] += np.char.multiply(
" ", n_missing_whitespaces[rows_missing_whitespaces]
)
return df

# TODO: refactor to remove linter warnings and so this can be tested.
@add_viadot_metadata_columns
def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR0912, PLR0915
Expand Down Expand Up @@ -1117,7 +1142,6 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09
record_key = "WA"
data_raw = np.array(response["DATA"])
del response

# If reference columns are provided, it's not necessary to remove
# any extra row.
if not isinstance(self.rfc_unique_id[0], str):
Expand All @@ -1126,22 +1150,15 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09
)
else:
start = False

records = list(_gen_split(data_raw, sep, record_key))
del data_raw

if (
isinstance(self.rfc_unique_id[0], str)
and list(df.columns) != fields
):
df_tmp = pd.DataFrame(columns=fields)
df_tmp[fields] = records
# SAP adds whitespaces to the first extracted column value.
# If whitespace is in unique column, it must be removed to make
# a proper merge.
for col in self.rfc_unique_id:
df_tmp[col] = df_tmp[col].str.strip()
df[col] = df[col].str.strip()
df_tmp = self._adjust_whitespaces(df_tmp)
df = pd.merge(df, df_tmp, on=self.rfc_unique_id, how="outer")
elif not start:
df[fields] = records
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/test_sap_rfc_2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import OrderedDict

from pandas import DataFrame

from viadot.utils import skip_test_on_missing_extra

from .test_sap_rfc import (
Expand Down Expand Up @@ -104,3 +106,14 @@ def test___build_pandas_filter_query():
sap._build_pandas_filter_query(sap.client_side_filters)
== "thirdlongcolname == 01234"
), sap._build_pandas_filter_query(sap.client_side_filters)


def test__adjust_whitespaces():
sap.rfc_unique_id = ["column1", "column2"]
sap._rfc_unique_id_len = {"column1": 5, "column2": 4}
data = {"column1": ["xyz ", "oiu"], "column2": ["yrt ", "lkj"]}
df = DataFrame(data)
df = sap._adjust_whitespaces(df)
col_values_len = df.applymap(lambda x: len(x))
check_if_length_match = col_values_len == sap._rfc_unique_id_len.values()
assert check_if_length_match.all().all()