Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ Changed credentials logic in sap_rfc to get credentials from KeyVault #866

Merged
merged 17 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Added
- Added option for sap_rfc connector to get credentials from Azure KeyVault or directly passing dictionary inside flow.

### Fixed
### Fixed

### Changed

Expand Down
8 changes: 8 additions & 0 deletions viadot/flows/sap_rfc_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def __init__(
rfc_total_col_width_character_limit: int = 400,
rfc_unique_id: List[str] = None,
sap_credentials: dict = None,
saprfc_credentials_key: str = "SAP",
Rafalz13 marked this conversation as resolved.
Show resolved Hide resolved
env: str = "PROD",
Rafalz13 marked this conversation as resolved.
Show resolved Hide resolved
output_file_extension: str = ".parquet",
local_file_path: str = None,
file_sep: str = "\t",
Expand Down Expand Up @@ -67,6 +69,8 @@ def __init__(
...
)
sap_credentials (dict, optional): The credentials to use to authenticate with SAP. By default, they're taken from the local viadot config.
saprfc_credentials_key (str, optional): Azure KV secret. Defaults to "SAP".
env (str, optional): SAP environment. Defaults to "PROD".
output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet".
local_file_path (str, optional): Local destination path. Defaults to None.
file_sep(str, optional): The separator to use in the CSV. Defaults to "\t".
Expand All @@ -91,6 +95,8 @@ def __init__(
self.rfc_total_col_width_character_limit = rfc_total_col_width_character_limit
self.rfc_unique_id = rfc_unique_id
self.sap_credentials = sap_credentials
self.saprfc_credentials_key = saprfc_credentials_key
self.env = env
self.output_file_extension = output_file_extension
self.local_file_path = local_file_path
self.file_sep = file_sep
Expand Down Expand Up @@ -121,6 +127,8 @@ def gen_flow(self) -> Flow:
rfc_unique_id=self.rfc_unique_id,
alternative_version=self.alternative_version,
credentials=self.sap_credentials,
saprfc_credentials_key=self.saprfc_credentials_key,
env=self.env,
flow=self,
)
if self.validate_df_dict:
Expand Down
38 changes: 22 additions & 16 deletions viadot/sources/sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,17 @@ def __init__(
"""

self._con = None
DEFAULT_CREDENTIALS = local_config.get("SAP").get("DEV")
saprfc_credentials_key = kwargs.pop("saprfc_credentials_key")
env = kwargs.pop("env")

credentials = kwargs.pop("credentials", None)
if credentials is None:
credentials = DEFAULT_CREDENTIALS
if credentials is None:
raise CredentialError("Missing credentials.")
logger.warning(
"Your credentials will use DEV environment. If you would like to use different one - please specified it."
f"Your credentials will use {env} environment from local config. If you would like to use different one - please specified it in sap_credentials parameter."
)
credentials = local_config.get(saprfc_credentials_key).get(env)
if credentials is None:
raise CredentialError(f"Missing {env} credentials!")

super().__init__(*args, credentials=credentials, **kwargs)

Expand Down Expand Up @@ -455,9 +456,11 @@ def _get_columns(self, sql: str, aliased: bool = False) -> List[str]:
self.aliases_keyed_by_columns = aliases_keyed_by_columns

columns = [
aliases_keyed_by_columns[col]
if col in aliases_keyed_by_columns
else col
(
aliases_keyed_by_columns[col]
if col in aliases_keyed_by_columns
else col
)
for col in columns
]

Expand Down Expand Up @@ -699,16 +702,17 @@ def __init__(
"""

self._con = None
DEFAULT_CREDENTIALS = local_config.get("SAP").get("DEV")
saprfc_credentials_key = kwargs.pop("saprfc_credentials_key")
env = kwargs.pop("env")

credentials = kwargs.pop("credentials", None)
if credentials is None:
credentials = DEFAULT_CREDENTIALS
if credentials is None:
raise CredentialError("Missing credentials.")
logger.warning(
"Your credentials will use DEV environment. If you would like to use different one - please specified it in 'sap_credentials' variable inside the flow."
f"Your credentials will use {env} environment from local config. If you would like to use different one - please specified it in sap_credentials parameter."
)
credentials = local_config.get(saprfc_credentials_key).get(env)
if credentials is None:
raise CredentialError(f"Missing {env} credentials!")

super().__init__(*args, credentials=credentials, **kwargs)

Expand Down Expand Up @@ -904,9 +908,11 @@ def _get_columns(self, sql: str, aliased: bool = False) -> List[str]:
self.aliases_keyed_by_columns = aliases_keyed_by_columns

columns = [
aliases_keyed_by_columns[col]
if col in aliases_keyed_by_columns
else col
(
aliases_keyed_by_columns[col]
if col in aliases_keyed_by_columns
else col
)
for col in columns
]

Expand Down
37 changes: 37 additions & 0 deletions viadot/tasks/sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from typing import List

import pandas as pd
import json
from prefect import Task
from prefect.utilities.tasks import defaults_from_attrs
from viadot.tasks import AzureKeyVaultSecret

try:
from viadot.sources import SAPRFC, SAPRFCV2
except ImportError:
raise
from prefect.utilities import logging

logger = logging.get_logger()


class SAPRFCToDF(Task):
Expand All @@ -20,6 +25,8 @@ def __init__(
func: str = None,
rfc_total_col_width_character_limit: int = 400,
credentials: dict = None,
saprfc_credentials_key: str = "SAP",
env: str = "PROD",
max_retries: int = 3,
retry_delay: timedelta = timedelta(seconds=10),
timeout: int = 3600,
Expand Down Expand Up @@ -52,12 +59,16 @@ def __init__(
512 characters. However, we observed SAP raising an exception even on a slightly lower number
of characters, so we add a safety margin. Defaults to 400.
credentials (dict, optional): The credentials to use to authenticate with SAP.
saprfc_credentials_key (str, optional): Azure KV secret. Defaults to "SAP".
env (str, optional): SAP environment. Defaults to "PROD".
By default, they're taken from the local viadot config.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can modify the docstring because it is a key for the credentials - it can be taken from Azure Key Vault or local config. It depends on what is possible.

Please change for the source and flow as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in task and flow

"""
self.query = query
self.sep = sep
self.replacement = replacement
self.credentials = credentials
self.saprfc_credentials_key = saprfc_credentials_key
self.env = env
self.func = func
self.rfc_total_col_width_character_limit = rfc_total_col_width_character_limit

Expand All @@ -84,6 +95,8 @@ def run(
sep: str = None,
replacement: str = "-",
credentials: dict = None,
saprfc_credentials_key: str = "SAP",
env: str = "PROD",
func: str = None,
rfc_total_col_width_character_limit: int = None,
rfc_unique_id: List[str] = None,
Expand All @@ -97,6 +110,9 @@ def run(
multiple options are automatically tried. Defaults to None.
replacement (str, optional): In case of sep is on a columns, set up a new character to replace
inside the string to avoid flow breakdowns. Defaults to "-".
credentials (dict, optional): The credentials to use to authenticate with SAP.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
credentials (dict, optional): The credentials to use to authenticate with SAP.
credentials (dict, optional): The credentials to use to authenticate with SAP. Defaults to None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed docstring to the old one from flow. And variable name from 'credentials' to 'sap_credentials'

saprfc_credentials_key (str, optional): Azure KV secret. Defaults to "SAP".
env (str, optional): SAP environment. Defaults to "PROD".
func (str, optional): SAP RFC function to use. Defaults to None.
rfc_total_col_width_character_limit (int, optional): Number of characters by which query will be split in chunks
in case of too many columns for RFC function. According to SAP documentation, the limit is
Expand All @@ -116,6 +132,23 @@ def run(
Returns:
pd.DataFrame: DataFrame with SAP data.
"""

if isinstance(credentials, dict):
credentials_keys = list(credentials.keys())
required_credentials_params = ["sysnr", "user", "passwd", "ashost"]
for key in required_credentials_params:
if key not in credentials_keys:
self.logger.warning(
f"Required key '{key}' not found in your 'sap_credentials' dictionary!"
)
credentials = None
Rafalz13 marked this conversation as resolved.
Show resolved Hide resolved

if credentials is None:
credentials_str = AzureKeyVaultSecret(
secret=saprfc_credentials_key,
).run()
credentials = json.loads(credentials_str).get(env)
Rafalz13 marked this conversation as resolved.
Show resolved Hide resolved

if query is None:
raise ValueError("Please provide the query.")
Rafalz13 marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -128,6 +161,8 @@ def run(
sep=sep,
replacement=replacement,
credentials=credentials,
saprfc_credentials_key=saprfc_credentials_key,
env=env,
func=func,
rfc_total_col_width_character_limit=rfc_total_col_width_character_limit,
rfc_unique_id=rfc_unique_id,
Expand All @@ -136,6 +171,8 @@ def run(
sap = SAPRFC(
sep=sep,
credentials=credentials,
saprfc_credentials_key=saprfc_credentials_key,
env=env,
func=func,
rfc_total_col_width_character_limit=rfc_total_col_width_character_limit,
)
Expand Down
Loading