Skip to content

Commit

Permalink
Merge pull request #962 from fdelgadodyvenia/c4c_test
Browse files Browse the repository at this point in the history
C4c test
  • Loading branch information
fdelgadodyvenia authored Jul 25, 2024
2 parents dd4878c + f667ab6 commit 22a4957
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 50 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ viadot-azure = [
"azure-identity>=1.16.0",
"dbt-sqlserver>=1.3, <1.8",
"prefect-azure @ git+https://github.com/Trymzet/prefect-azure@add_keyvault_auth#egg=prefect-azure",
"prefect_github",
"O365==2.0.18.1",
"asynctest",
]
viadot-aws = [
"s3fs==2024.6.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
df_to_adls,
)


@flow
def cloud_for_customers_to_adls( # noqa: PLR0913, PLR0917
# C4C
cloud_for_customers_url: str,
cloud_for_customers_url: str | None = None,
fields: list[str] | None = None,
dtype: dict[str, Any] | None = None,
endpoint: str | None = None,
Expand Down Expand Up @@ -71,3 +70,14 @@ def cloud_for_customers_to_adls( # noqa: PLR0913, PLR0917
config_key=adls_config_key,
overwrite=overwrite,
)

if __name__ == '__main__':
cloud_for_customers_to_adls(
report_url="https://my341115.crm.ondemand.com/sap/c4c/odata/ana_businessanalytics_analytics.svc/RPZ36A87743F65355C0B904A5QueryResults?$select=TDOC_PRIORITY",
filter_params={'CBTD_REF_TYPE_CODE':'(%20eq%20%27118%27)'},
adls_path=f"raw/c4c/ticket/leads_link/c4c_tickets_leads_link.parquet",
overwrite=True,
cloud_for_customers_credentials_secret='aia-c4c-prod',
adls_credentials_secret='app-azure-cr-datalakegen2',
)

Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ def cloud_for_customers_to_df( # noqa: PLR0913, PLR0917
credentials=credentials,
config_key=config_key,
)
return c4c.to_df(fields=fields, dtype=dtype, tests=tests, **kwargs)
# fields=fields, dtype=dtype, tests=tests,
return c4c.to_df(**kwargs)
23 changes: 13 additions & 10 deletions src/viadot/sources/azure_data_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ def __init__(
**kwargs,
):
credentials = credentials or get_source_credentials(config_key)
# pass to lower letters
credentials = {key.lower(): value for key, value in credentials.items()}

required_credentials = (
"account_name",
"tenant_id",
"client_id",
"client_secret",
"azure_tenant_id",
"azure_client_id",
"azure_client_secret",
)
required_credentials_are_provided = all(
[rc in credentials for rc in required_credentials]
Expand All @@ -58,16 +61,16 @@ def __init__(
super().__init__(*args, credentials=credentials, **kwargs)

storage_account_name = self.credentials["account_name"]
tenant_id = self.credentials["tenant_id"]
client_id = self.credentials["client_id"]
client_secret = self.credentials["client_secret"]

tenant_id = self.credentials["azure_tenant_id"]
client_id = self.credentials["azure_client_id"]
client_secret = self.credentials["azure_client_secret"]
self.path = path
self.gen = gen
self.storage_options = {
"tenant_id": tenant_id,
"client_id": client_id,
"client_secret": client_secret,
"azure_tenant_id": tenant_id,
"azure_client_id": client_id,
"azure_client_secret": client_secret,
}
if gen == 1:
self.fs = AzureDatalakeFileSystem(
Expand Down
142 changes: 105 additions & 37 deletions src/viadot/sources/cloud_for_customers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
"""cloud_for_customers.py.
Implement C4C Connector.
This module provides functionalities to have access to C4C data. It includes
the following features:
- Credential Management
- Data Source Data Collection
Classes:
CloudForCustomersCredentials(BaseModel): Description of Class1.
CloudForCustomers: Cloud for Customers connector to fetch Odata source.
"""

import re
from copy import deepcopy
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Literal
from urllib.parse import urljoin

import pandas as pd
Expand All @@ -14,13 +28,30 @@


class CloudForCustomersCredentials(BaseModel):
"""Cloud for Customers connector credentials validator.
Validate the credentials.
Methods:
is_configured: main method to validate.
"""

username: str # eg. username@{tenant_name}.com
password: SecretStr
url: Optional[str] = None # The URL to extract records from.
report_url: Optional[str] = None # The URL of a prepared report.

@classmethod
@root_validator(pre=True)
def is_configured(cls, credentials):
"""Validate Credentials.
Args:
credentials (dict): dictinary with user and password.
Returns:
credentials (dict): dictinary with user and password.
"""
username = credentials.get("username")
password = credentials.get("password")

Expand All @@ -34,29 +65,51 @@ class CloudForCustomers(Source):
"""Cloud for Customers connector to fetch Odata source.
Args:
url (str, optional): The URL to the C4C API. E.g 'https://myNNNNNN.crm.ondemand.com/c4c/v1/'.
url (str, optional): The URL to the C4C API.
E.g 'https://myNNNNNN.crm.ondemand.com/c4c/v1/'.
endpoint (str, optional): The API endpoint.
report_url (str, optional): The URL of a prepared report.
filter_params (Dict[str, Any], optional): Filtering parameters passed to the request. E.g {"$filter": "AccountID eq '1234'"}.
More info on: https://userapps.support.sap.com/sap/support/knowledge/en/2330688
credentials (CloudForCustomersCredentials, optional): Cloud for Customers credentials.
config_key (str, optional): The key in the viadot config holding relevant credentials.
filter_params (Dict[str, Any], optional): Filtering parameters
passed to the request. E.g {"$filter": "AccountID eq '1234'"}.
More info on:
https://userapps.support.sap.com/sap/support/knowledge/en/2330688
credentials (CloudForCustomersCredentials, optional):
Cloud for Customers credentials.
config_key (str, optional): The key in the viadot config holding relevant
credentials.
"""

DEFAULT_PARAMS = {"$format": "json"}

def __init__(
self,
*args,
url: str = None,
endpoint: str = None,
report_url: str = None,
filter_params: Dict[str, Any] = None,
credentials: CloudForCustomersCredentials = None,
config_key: Optional[str] = None,
*args,
**kwargs,
):
## Credentials logic
"""
Initialize the class with the provided parameters.
Args:
*args: Variable length argument list.
url (str, optional): The base URL for the service.
endpoint (str, optional): The specific endpoint for the service.
report_url (str, optional): The URL for the report.
filter_params (Dict[str, Any], optional): Parameters to filter the
report data.
credentials (CloudForCustomersCredentials, optional): Credentials
required for authentication.
config_key (Optional[str], optional): A key to retrieve specific
configuration settings.
**kwargs: Arbitrary keyword arguments.
"""
# Credentials logic
raw_creds = credentials or get_source_credentials(config_key) or {}
validated_creds = dict(
CloudForCustomersCredentials(**raw_creds)
Expand All @@ -71,7 +124,6 @@ def __init__(

if self.url:
self.full_url = urljoin(self.url, self.endpoint)

if filter_params:
filter_params_merged = self.DEFAULT_PARAMS.copy()
filter_params_merged.update(filter_params)
Expand All @@ -82,7 +134,7 @@ def __init__(

@staticmethod
def create_metadata_url(url: str) -> str:
"""Creates URL to fetch metadata from.
"""Create URL to fetch metadata from.
Args:
url (str): The URL to transform to metadata URL.
Expand All @@ -97,7 +149,7 @@ def create_metadata_url(url: str) -> str:
return meta_url

def _extract_records_from_report_url(self, report_url: str) -> List[Dict[str, Any]]:
"""Fetches report_url to extract records.
"""Fetch report_url to extract records.
Args:
report_url (str): The url to extract records from.
Expand All @@ -117,7 +169,7 @@ def _extract_records_from_report_url(self, report_url: str) -> List[Dict[str, An
return records

def _extract_records_from_url(self, url: str) -> List[Dict[str, Any]]:
"""Fetches URL to extract records.
"""Fetch URL to extract records.
Args:
url (str): The URL to extract records from.
Expand All @@ -140,7 +192,8 @@ def _extract_records_from_url(self, url: str) -> List[Dict[str, Any]]:
new_records = response_json["d"]
url = response_json.get("__next", None)

# prevents concatenation of previous urls with filter_params with the same filter_params
# prevents concatenation of previous urls with filter_params with the same
# filter_params
tmp_filter_params = None
tmp_full_url = url

Expand All @@ -149,27 +202,29 @@ def _extract_records_from_url(self, url: str) -> List[Dict[str, Any]]:
return records

def extract_records(
self, url: Optional[str], report_url: Optional[str]
self, url: Optional[str] = None, report_url: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Downloads records from `url` or `report_url` if present.
"""Download records from `url` or `report_url` if present.
Returns:
records (List[Dict[str, Any]]): The records extracted from URL.
"""
if self.is_report:
return self._extract_records_from_report_url(url=report_url)
return self._extract_records_from_report_url(report_url=report_url)
if url:
full_url = urljoin(url, self.endpoint)
else:
if url:
full_url = urljoin(url, self.endpoint)
else:
full_url = self.full_url
return self._extract_records_from_url(url=full_url)
full_url = self.full_url
return self._extract_records_from_url(url=full_url)

def get_entities(
self, dirty_json: Dict[str, Any], url: str
) -> List[Dict[str, Any]]:
"""Extracts entities from request.json(). Entities represents objects that store information.
More info on: https://help.sap.com/docs/EAD_HANA/0e60f05842fd41078917822867220c78/0bd1db568fa546d6823d4c19a6b609ab.html
"""Extract entities from request.json().
Entities represent objects that store information. More info on:
https://help.sap.com/docs/EAD_HANA/0e60f05842fd41078917822867220c78/
0bd1db568fa546d6823d4c19a6b609ab.html
Args:
dirty_json (Dict[str, Any]): request.json() dict from response to API.
Expand All @@ -178,7 +233,6 @@ def get_entities(
Returns:
entities (List[Dict[str, Any]]): list filled with entities.
"""

metadata_url = self.create_metadata_url(url)
column_maper_dict = self.get_property_to_sap_label_dict(metadata_url)
entities = []
Expand All @@ -196,22 +250,23 @@ def get_entities(
return entities

def get_property_to_sap_label_dict(self, url: str = None) -> Dict[str, str]:
"""Creates Dict that maps Property Name to value of SAP label.
"""Create Dict that maps Property Name to value of SAP label.
Property: Properties define the characteristics of the data.
SAP label: Labels are used for identification and for provision of content information.
SAP label: Labels are used for identification and for provision of content
information.
Args:
url (str, optional): The URL to fetch metadata from.
Returns:
Dict[str, str]: Property Name to value of SAP label.
"""

column_mapping = {}
if url:
username = self.credentials.get("username")
pw = self.credentials.get("password")
response = requests.get(url, auth=(username, pw))
password = self.credentials.get("password")
response = requests.get(url, auth=(username, password))
for sentence in response.text.split("/>"):
result = re.search(
r'(?<=Name=")([^"]+).+(sap:label=")([^"]+)+', sentence
Expand All @@ -228,32 +283,30 @@ def get_response(
filter_params: Dict[str, Any] = None,
timeout: tuple = (3.05, 60 * 30),
) -> requests.models.Response:
"""Handles requests.
"""Handle requests.
Args:
url (str): The url to request to.
filter_params (Dict[str, Any], optional): Additional parameters like filter, used in case of normal url.
filter_params (Dict[str, Any], optional): Additional parameters like filter,
used in case of normal url.
timeout (tuple, optional): The request time-out. Default is (3.05, 60 * 30).
Returns:
requests.models.Response.
"""
username = self.credentials.get("username")
pw = self.credentials.get("password")
password = self.credentials.get("password")
response = handle_api_response(
url=url,
params=filter_params,
auth=(username, pw),
auth=(username, password),
timeout=timeout,
)
return response

def to_df(
self,
url: str = None,
fields: List[str] = None,
dtype: dict = None,
tests: dict = None,
if_empty: Literal["warn", "skip", "fail"] = "warn",
**kwargs,
) -> pd.DataFrame:
"""Download a table or report into a pandas DataFrame.
Expand All @@ -270,6 +323,21 @@ def to_df(
Returns:
df (pandas.DataFrame): DataFrame containing the records.
"""
# Your implementation here
if if_empty == "warn":
print("Warning: DataFrame is empty.")
elif if_empty == "skip":
print("Skipping due to empty DataFrame.")
elif if_empty == "fail":
print("Failing due to empty DataFrame.")
else:
raise ValueError("Invalid value for if_empty parameter.")

url: str = kwargs.get('url', "")
fields: List[str] = kwargs.get('fields', [])
dtype: Dict[str, Any] = kwargs.get('dtype', {})
tests: Dict[str, Any] = kwargs.get('tests', {})

url = url or self.url
records = self.extract_records(url=url)
df = pd.DataFrame(data=records, **kwargs)
Expand Down
Loading

0 comments on commit 22a4957

Please sign in to comment.