Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Salesforce migration #629

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3615647
✨ Migrated salesforce source
djagoda881 Feb 15, 2023
66d9063
🔥 Removed object instance
djagoda881 Feb 15, 2023
7951428
✨ Added slaesforce source to __init__
djagoda881 Feb 15, 2023
d08d07f
✅ Added unit tests to salesfource source
djagoda881 Feb 21, 2023
8b99f6a
✅ Updated test structure
djagoda881 Feb 21, 2023
773797d
✅ Migrated unit tests from viadot 1.0
djagoda881 Feb 23, 2023
7fc3688
🔐 Updated name of salesforce credentials
djagoda881 Feb 23, 2023
ee468c9
🔥 Removed if_empty parameter
djagoda881 Mar 2, 2023
4046167
✅ Updated unit tests structure
djagoda881 Mar 2, 2023
bf79a66
📝 Updated main class docstring
djagoda881 Mar 2, 2023
a3fea0e
📝 Added docstring to Slesforce class functions
djagoda881 Mar 2, 2023
f976ecd
🎨 Changed name of 'code' var to 'valid_response_code'
djagoda881 Mar 2, 2023
f5deb9a
🎨 Changed name of 'response' var to 'response_code'
djagoda881 Mar 2, 2023
ddee24f
🔥 Removed response_code var declaration
djagoda881 Mar 3, 2023
0e07e8d
🎨 Improved connetion with env
djagoda881 Mar 6, 2023
3aaad06
💡 Updated comment in download fuction
djagoda881 Mar 6, 2023
d0f0416
🔐 Updated config_key names
djagoda881 Mar 6, 2023
63913e0
Revert "🔐 Updated config_key names"
djagoda881 Mar 6, 2023
a8fa335
🔐 Updated config_key name after git revert
djagoda881 Mar 6, 2023
b408db7
✅ Improved cleaning after tests
djagoda881 Mar 6, 2023
ab1d1af
✅ 🔥 Removed test `test_upsert_empty`
djagoda881 Mar 6, 2023
de73e4c
✅ 🎨Improved structure of upsert test
djagoda881 Mar 6, 2023
61bf161
✅🎨Improved structure of test_to_df
djagoda881 Mar 6, 2023
47e177e
✅ 🎨 Improved structure of `test_upsert`
djagoda881 Mar 6, 2023
3e2ef08
✅ 🎨 Changed name of variable
djagoda881 Mar 6, 2023
97e6dbb
🎨 Improved retrieving externalID
djagoda881 Mar 6, 2023
5625cee
⏪ Removed tests code
djagoda881 Mar 6, 2023
4215dea
✅ Chenged testing structure in upsert tests
djagoda881 Mar 9, 2023
b6e311e
🎨Rename `external_id` var to `external_id_column`
djagoda881 Mar 9, 2023
c8bdf4e
✅ Added inserting two rows
djagoda881 Mar 9, 2023
5adfbe2
✅ Improved `test_to_df`
djagoda881 Mar 9, 2023
570b579
✅📝Improved docstings in Salesforce class
djagoda881 Mar 9, 2023
4415009
🐛 Corrected variable name
djagoda881 Mar 10, 2023
990775a
✅ Added bulk upsert test and refeactored tests
djagoda881 Mar 10, 2023
03d07c4
📝 Updated docstrings in Salesforce source
djagoda881 Mar 20, 2023
7885ba7
✅ Removed try except from tests
djagoda881 Mar 21, 2023
ea0fa2a
✅ Renamed test and fixed typos
djagoda881 Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `ExchangeRates` source to the library.
- Added `from_df()` method to `Azure Data Lake` source
- Added `SAPRFC` source to the library.
- Added `Salesforce` source to the library.

### Changed
- Added `SQLServerToDF` task
Expand Down
87 changes: 87 additions & 0 deletions tests/unit/test_salesforce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pandas as pd
import pytest
from viadot.sources import Salesforce

TABLE_TO_DOWNLOAD = "Account"
TABLE_TO_UPSERT = "Contact"
TEST_LAST_NAME = "prefect-viadot-test"
ID_TO_UPSERT = "0035E00001YGWK3QAP"


@pytest.fixture(scope="session")
def salesforce():
s = Salesforce(config_key="salesforce_dev")
yield s


@pytest.fixture(scope="session")
def test_df_data(salesforce):
data = {
"Id": [ID_TO_UPSERT],
"LastName": [TEST_LAST_NAME],
}
df = pd.DataFrame(data=data)

yield df

sf = salesforce.salesforce
sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"})


@pytest.fixture(scope="session")
def test_df_external(salesforce):
data = {
"LastName": [TEST_LAST_NAME],
"SAPContactId__c": ["111"],
}
df = pd.DataFrame(data=data)
yield df

sf = salesforce.salesforce
sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"})


def test_upsert_external_id_correct(salesforce, test_df_external):
trymzet marked this conversation as resolved.
Show resolved Hide resolved
try:
salesforce.upsert(
df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPContactId__c"
)
except Exception as exception:
raise exception

sf = salesforce.salesforce
result = sf.query(
f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE ID='{ID_TO_UPSERT}'"
)

assert result["records"][0]["LastName"] == TEST_LAST_NAME


def test_upsert_external_id_wrong(salesforce, test_df_external):
with pytest.raises(ValueError):
salesforce.upsert(
df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPId"
)


def test_download_no_query(salesforce):
ordered_dict = salesforce.download(table=TABLE_TO_DOWNLOAD)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
assert len(ordered_dict) > 0


def test_download_with_query(salesforce):
query = f"SELECT Id, Name FROM {TABLE_TO_DOWNLOAD}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add some limit to the query to not download the whole table?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordered_dict = salesforce.download(query=query)
assert len(ordered_dict) > 0


def test_to_df(salesforce):
df = salesforce.to_df(table=TABLE_TO_DOWNLOAD)
print(len(df.values))
trymzet marked this conversation as resolved.
Show resolved Hide resolved
assert df.empty == False
assert len(df.columns) == 98
assert len(df.values) >= 1000
trymzet marked this conversation as resolved.
Show resolved Hide resolved


def test_upsert(salesforce, test_df_data):
salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
from .s3 import S3
from .sharepoint import Sharepoint
from .redshift_spectrum import RedshiftSpectrum
from .salesforce import Salesforce
233 changes: 233 additions & 0 deletions viadot/sources/salesforce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
from typing import Any, Dict, List, Literal, OrderedDict

import pandas as pd
from simple_salesforce import Salesforce as SF
from simple_salesforce.exceptions import SalesforceMalformedRequest
from viadot.config import get_source_credentials
from viadot.exceptions import CredentialError
from viadot.sources.base import Source


class Salesforce(Source):
"""
A class for downloading and upserting data from Salesforce.

Args:
domain (str, optional): Domain of a connection. Defaults to 'test' (sandbox).
Can be added only if built-in username/password/security token is provided.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

built-in?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_id (str, optional): Client id to keep the track of API calls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id -> id, keep the track -> keep track

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults to 'viadot'.
env (Literal["DEV", "QA", "PROD"], optional): Environment information, provides information
about credential and connection configuration. Defaults to 'DEV'.
credentials (Dict[str, Any], optional): Credentials to connect with Salesforce.
If not provided, will read from local config file. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant credentials.
Defaults to None.
"""

def __init__(
self,
*args,
domain: str = "test",
client_id: str = "viadot",
env: Literal["DEV", "QA", "PROD"] = "DEV",
credentials: Dict[str, Any] = None,
trymzet marked this conversation as resolved.
Show resolved Hide resolved
config_key: str = None,
**kwargs,
):

credentials = credentials or get_source_credentials(config_key) or {}

if credentials is None:
raise CredentialError("Please specify the credentials.")

super().__init__(*args, credentials=credentials, **kwargs)

if env.upper() == "DEV" or env.upper() == "QA":
self.salesforce = SF(
username=self.credentials.get("username"),
password=self.credentials.get("password"),
security_token=self.credentials.get("token"),
domain=domain,
client_id=client_id,
)

elif env.upper() == "PROD":
self.salesforce = SF(
username=self.credentials.get("username"),
password=self.credentials.get("password"),
security_token=self.credentials.get("token"),
)

else:
raise ValueError("The only available environments are DEV, QA, and PROD.")

def upsert(
self,
df: pd.DataFrame,
table: str,
external_id: str = None,
raise_on_error: bool = False,
) -> None:
"""
Performs upsert operations on the selected row in the table.
trymzet marked this conversation as resolved.
Show resolved Hide resolved

Args:
df (pd.DataFrame): The DataFrame to upsert. Only a single row can be upserted with this function.
trymzet marked this conversation as resolved.
Show resolved Hide resolved
table (str): The table where the data should be upserted.
external_id (str, optional): The external ID to use for the upsert. Defaults to None.
raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails.
If False, we only display a warning. Defaults to False.
"""
if df.empty:
self.logger.info("No data to upsert.")
return

if external_id and external_id not in df.columns:
raise ValueError(
f"Passed DataFrame does not contain column '{external_id}'."
)

table_to_upsert = getattr(self.salesforce, table)
records = df.to_dict("records")
records_cp = records.copy()

for record in records_cp:

if external_id:
if record[external_id] is None:
continue
djagoda881 marked this conversation as resolved.
Show resolved Hide resolved
else:
merge_key = f"{external_id}/{record[external_id]}"
record.pop(external_id)
else:
merge_key = record.pop("Id")

try:
response_code = table_to_upsert.upsert(data=record, record_id=merge_key)
except SalesforceMalformedRequest as e:
msg = f"Upsert of record {merge_key} failed."
if raise_on_error:
raise ValueError(msg) from e
else:
self.logger.warning(msg)

valid_response_codes = {200: "updated", 201: "created", 204: "updated"}

if response_code not in valid_response_codes:
msg = f"Upsert failed for record: \n{record} with response code {response_code }"
if raise_on_error:
raise ValueError(msg)
else:
self.logger.warning(msg)
else:
self.logger.info(
f"Successfully {valid_response_codes[response_code]} record {merge_key}."
)

self.logger.info(
f"Successfully upserted {len(records)} records into table '{table}'."
)

def bulk_upsert(
self,
df: pd.DataFrame,
table: str,
external_id: str = None,
batch_size: int = 10000,
raise_on_error: bool = False,
) -> None:
"""
Performs upsert operations on multiple rows in a table.
trymzet marked this conversation as resolved.
Show resolved Hide resolved

Args:
df (pd.DataFrame): The DataFrame to upsert.
table (str): The table where the data should be upserted.
external_id (str, optional): The external ID to use for the upsert. Defaults to None.
batch_size (int, optional): Number of records to be included in each batch of records
that are sent to the Salesforce API for processing. Defaults to 10000.
raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails.
If False, we only display a warning. Defaults to False.
"""
if df.empty:
self.logger.info("No data to upsert.")
return

if external_id and external_id not in df.columns:
raise ValueError(
f"Passed DataFrame does not contain column '{external_id}'."
)
records = df.to_dict("records")

try:
response_code = self.salesforce.bulk.__getattr__(table).upsert(
data=records, external_id_field=external_id, batch_size=batch_size
)
except SalesforceMalformedRequest as e:
# Bulk insert didn't work at all.
raise ValueError(f"Upsert of records failed: {e}") from e

self.logger.info(f"Successfully upserted bulk records.")

if any(result.get("success") is not True for result in response_code):
# Upsert of some individual records failed.
failed_records = [
result for result in response_code if result.get("success") is not True
]
msg = f"Upsert failed for records {failed_records} with response code {response_code}"
if raise_on_error:
raise ValueError(msg)
else:
self.logger.warning(msg)

self.logger.info(
f"Successfully upserted {len(records)} records into table '{table}'."
)

def download(
self, query: str = None, table: str = None, columns: List[str] = None
) -> List[OrderedDict]:
trymzet marked this conversation as resolved.
Show resolved Hide resolved
"""
Dowload all data from the indicated table or the result of the specified query.
trymzet marked this conversation as resolved.
Show resolved Hide resolved

Args:
query (str, optional): Query for download the specific data. Defaults to None.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query for download -> The query to be used to download the data. (appears 2x)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table (str, optional): Table name. Defaults to None.
columns (List[str], optional): List of columns which are needed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"List of columns which are needed, requires table argument"-> "list of required columns; requires table to be specified" (appears 2x)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requires table argument. Defaults to None.

Returns:
List[OrderedDict]: Selected rows from Salesforce.
"""
if not query:
if columns:
columns_str = ", ".join(columns)
else:
columns_str = "FIELDS(STANDARD)"
query = f"SELECT {columns_str} FROM {table}"
records = self.salesforce.query(query).get("records")
# Remove metadata from the data
_ = [record.pop("attributes") for record in records]
return records

def to_df(
self,
query: str = None,
table: str = None,
columns: List[str] = None,
) -> pd.DataFrame:
"""
Converts the List returned by the download functions to a DataFrame.
trymzet marked this conversation as resolved.
Show resolved Hide resolved

Args:
query (str, optional): Query for download the specific data. Defaults to None.
table (str, optional): Table name. Defaults to None.
columns (List[str], optional): List of columns which are needed,
requires table argument. Defaults to None.

Returns:
pd.DataFrame: Selected rows from Salesforce.
"""
records = self.download(query=query, table=table, columns=columns)

return pd.DataFrame(records)