Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Salesforce migration #629

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3615647
✨ Migrated salesforce source
djagoda881 Feb 15, 2023
66d9063
🔥 Removed object instance
djagoda881 Feb 15, 2023
7951428
✨ Added slaesforce source to __init__
djagoda881 Feb 15, 2023
d08d07f
✅ Added unit tests to salesfource source
djagoda881 Feb 21, 2023
8b99f6a
✅ Updated test structure
djagoda881 Feb 21, 2023
773797d
✅ Migrated unit tests from viadot 1.0
djagoda881 Feb 23, 2023
7fc3688
🔐 Updated name of salesforce credentials
djagoda881 Feb 23, 2023
ee468c9
🔥 Removed if_empty parameter
djagoda881 Mar 2, 2023
4046167
✅ Updated unit tests structure
djagoda881 Mar 2, 2023
bf79a66
📝 Updated main class docstring
djagoda881 Mar 2, 2023
a3fea0e
📝 Added docstring to Slesforce class functions
djagoda881 Mar 2, 2023
f976ecd
🎨 Changed name of 'code' var to 'valid_response_code'
djagoda881 Mar 2, 2023
f5deb9a
🎨 Changed name of 'response' var to 'response_code'
djagoda881 Mar 2, 2023
ddee24f
🔥 Removed response_code var declaration
djagoda881 Mar 3, 2023
0e07e8d
🎨 Improved connetion with env
djagoda881 Mar 6, 2023
3aaad06
💡 Updated comment in download fuction
djagoda881 Mar 6, 2023
d0f0416
🔐 Updated config_key names
djagoda881 Mar 6, 2023
63913e0
Revert "🔐 Updated config_key names"
djagoda881 Mar 6, 2023
a8fa335
🔐 Updated config_key name after git revert
djagoda881 Mar 6, 2023
b408db7
✅ Improved cleaning after tests
djagoda881 Mar 6, 2023
ab1d1af
✅ 🔥 Removed test `test_upsert_empty`
djagoda881 Mar 6, 2023
de73e4c
✅ 🎨Improved structure of upsert test
djagoda881 Mar 6, 2023
61bf161
✅🎨Improved structure of test_to_df
djagoda881 Mar 6, 2023
47e177e
✅ 🎨 Improved structure of `test_upsert`
djagoda881 Mar 6, 2023
3e2ef08
✅ 🎨 Changed name of variable
djagoda881 Mar 6, 2023
97e6dbb
🎨 Improved retrieving externalID
djagoda881 Mar 6, 2023
5625cee
⏪ Removed tests code
djagoda881 Mar 6, 2023
4215dea
✅ Chenged testing structure in upsert tests
djagoda881 Mar 9, 2023
b6e311e
🎨Rename `external_id` var to `external_id_column`
djagoda881 Mar 9, 2023
c8bdf4e
✅ Added inserting two rows
djagoda881 Mar 9, 2023
5adfbe2
✅ Improved `test_to_df`
djagoda881 Mar 9, 2023
570b579
✅📝Improved docstings in Salesforce class
djagoda881 Mar 9, 2023
4415009
🐛 Corrected variable name
djagoda881 Mar 10, 2023
990775a
✅ Added bulk upsert test and refeactored tests
djagoda881 Mar 10, 2023
03d07c4
📝 Updated docstrings in Salesforce source
djagoda881 Mar 20, 2023
7885ba7
✅ Removed try except from tests
djagoda881 Mar 21, 2023
ea0fa2a
✅ Renamed test and fixed typos
djagoda881 Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `ExchangeRates` source to the library.
- Added `from_df()` method to `Azure Data Lake` source
- Added `SAPRFC` source to the library.
- Added `Salesforce` source to the library.

### Changed
- Added `SQLServerToDF` task
Expand Down
101 changes: 101 additions & 0 deletions tests/unit/test_salesforce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pandas as pd
import pytest

from viadot.sources import Salesforce

TABLE_TO_DOWNLOAD = "Account"
TABLE_TO_UPSERT = "Contact"
TEST_LAST_NAME = "prefect-viadot-test"
ID_TO_UPSERT = "0035E00001YGWK3QAP"


@pytest.fixture(scope="session")
def salesforce():
s = Salesforce(config_key="sales_force_dev")
trymzet marked this conversation as resolved.
Show resolved Hide resolved
yield s


@pytest.fixture(scope="session")
def test_df_data(salesforce):
data = {
"Id": [ID_TO_UPSERT],
"LastName": [TEST_LAST_NAME],
}
df = pd.DataFrame(data=data)

yield df

data_restored = {
"Id": [ID_TO_UPSERT],
"LastName": ["LastName"],
}
df_restored = pd.DataFrame(data=data_restored)
salesforce.upsert(df=df_restored, table=TABLE_TO_UPSERT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not use a function you're about to test; also the teardown should be cleaning up resources, not creating them

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@trymzet trymzet Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second point remains, I think it may be confusing because you're mixing 2 things together in these fixtures, you're creating a test df but also randomly upserting records. It's actually not easy to understand the tests since you already upsert records in the fixture, I don't think this should be happening?

Perhaps you meant to insert some records before running upsert() test in order to validate that they are correctly updated in the Salesforce.upsert() method (to validate the "update" part of upsert()'s functionality)? But in that case, I don't think this needs to be in a fixture, you can do this (note also there is a cleanup at the end, pls also add it - records should be created and removed in each test).

TEST_DF_EXTERNAL = pd.DataFrame({"LastName": [TEST_LAST_NAME], "SAPContactId__c": ["111"]})

def test_upsert(salesforce):
    # Setup.
    sf = salesforce.salesforce
    test_record = TEST_DF_EXTERNAL.to_records()  # not sure if this is the data structure required by insert(), pls check
    sf.Contact.insert(test_record)
     
    # Test.
    salesforce.upsert(
        df=TEST_DF_EXTERNAL, table=TABLE_TO_UPSERT, external_id="SAPContactId__c"
    )

    result = sf.query(
        f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE ID='{ID_TO_UPSERT}'"
    )
    assert result["records"][0]["LastName"] == TEST_LAST_NAME

    # Cleanup
    sf.delete(test_record)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be discussed

I have changed these structures, now in fixtures I create a test record in the database and when the test is finished it is deleted

✅ Chenged testing structure in upsert tests



@pytest.fixture(scope="session")
def test_df_external(salesforce):
data = {
"LastName": [TEST_LAST_NAME],
"SAPContactId__c": ["111"],
}
df = pd.DataFrame(data=data)
yield df

data_restored = {
"Id": [ID_TO_UPSERT],
"LastName": ["LastName"],
}
df_restored = pd.DataFrame(data=data_restored)
salesforce.upsert(df=df_restored, table=TABLE_TO_UPSERT)


def test_upsert_empty(salesforce):
try:
trymzet marked this conversation as resolved.
Show resolved Hide resolved
df = pd.DataFrame()
salesforce.upsert(df=df, table=TABLE_TO_UPSERT)
except Exception as exception:
assert False, exception


def test_upsert_external_id_correct(salesforce, test_df_external):
trymzet marked this conversation as resolved.
Show resolved Hide resolved
try:
salesforce.upsert(
df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPContactId__c"
)
except Exception as exception:
assert False, exception
trymzet marked this conversation as resolved.
Show resolved Hide resolved
df = salesforce.to_df(
query=f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE LastName='{TEST_LAST_NAME}'"
)

result = df.values
assert result[0][0] == ID_TO_UPSERT
assert result[0][1] == TEST_LAST_NAME


def test_upsert_external_id_wrong(salesforce, test_df_external):
with pytest.raises(ValueError):
salesforce.upsert(
df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPId"
)


def test_download_no_query(salesforce):
ordered_dict = salesforce.download(table=TABLE_TO_DOWNLOAD)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
assert len(ordered_dict) > 0


def test_download_with_query(salesforce):
query = f"SELECT Id, Name FROM {TABLE_TO_DOWNLOAD}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add some limit to the query to not download the whole table?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordered_dict = salesforce.download(query=query)
assert len(ordered_dict) > 0


def test_to_df(salesforce):
df = salesforce.to_df(table=TABLE_TO_DOWNLOAD)
assert df.empty == False
trymzet marked this conversation as resolved.
Show resolved Hide resolved


def test_upsert(salesforce, test_df_data):
salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
from .s3 import S3
from .sharepoint import Sharepoint
from .redshift_spectrum import RedshiftSpectrum
from .salesforce import Salesforce
192 changes: 192 additions & 0 deletions viadot/sources/salesforce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from typing import Any, Dict, List, Literal, OrderedDict

import pandas as pd
from simple_salesforce import Salesforce as SF
from simple_salesforce.exceptions import SalesforceMalformedRequest
from viadot.config import get_source_credentials
from viadot.exceptions import CredentialError
from viadot.sources.base import Source


class Salesforce(Source):
"""
A class for pulling data from theSalesforce.
trymzet marked this conversation as resolved.
Show resolved Hide resolved

Args:
domain (str): domain of a connection; defaults to 'test' (sandbox). Can be added only if built-in username/password/security token is provided.
client_id (str): client id to keep the track of API calls.
credentials (dict): credentials to connect with. If not provided, will read from local config file.
env (Literal): environment information, provides information about credential and connection configuration; defaults to 'DEV'.
config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None.
trymzet marked this conversation as resolved.
Show resolved Hide resolved

"""

def __init__(
self,
*args,
domain: str = "test",
client_id: str = "viadot",
credentials: Dict[str, Any] = None,
trymzet marked this conversation as resolved.
Show resolved Hide resolved
env: Literal["DEV", "QA", "PROD"] = "DEV",
config_key: str = None,
**kwargs,
):

credentials = credentials or get_source_credentials(config_key) or {}

if credentials is None:
raise CredentialError("Please specify the credentials.")

super().__init__(*args, credentials=credentials, **kwargs)

if env.upper() == "DEV":
self.salesforce = SF(
username=self.credentials.get("username"),
password=self.credentials.get("password"),
security_token=self.credentials.get("token"),
domain=domain,
client_id=client_id,
)
elif env.upper() == "QA":
self.salesforce = SF(
username=self.credentials.get("username"),
password=self.credentials.get("password"),
security_token=self.credentials.get("token"),
domain=domain,
client_id=client_id,
)
elif env.upper() == "PROD":
self.salesforce = SF(
username=self.credentials.get("username"),
password=self.credentials.get("password"),
security_token=self.credentials.get("token"),
domain=domain,
client_id=client_id,
)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError("The only available environments are DEV, QA, and PROD.")

def upsert(
self,
df: pd.DataFrame,
table: str,
external_id: str = None,
raise_on_error: bool = False,
) -> None:

trymzet marked this conversation as resolved.
Show resolved Hide resolved
if df.empty:
self.logger.info("No data to upsert.")
return

if external_id and external_id not in df.columns:
raise ValueError(
f"Passed DataFrame does not contain column '{external_id}'."
)

table_to_upsert = getattr(self.salesforce, table)
records = df.to_dict("records")
records_cp = records.copy()

for record in records_cp:
response = 0
if external_id:
if record[external_id] is None:
continue
djagoda881 marked this conversation as resolved.
Show resolved Hide resolved
else:
merge_key = f"{external_id}/{record[external_id]}"
record.pop(external_id)
else:
merge_key = record.pop("Id")

try:
response = table_to_upsert.upsert(data=record, record_id=merge_key)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
except SalesforceMalformedRequest as e:
msg = f"Upsert of record {merge_key} failed."
if raise_on_error:
raise ValueError(msg) from e
else:
self.logger.warning(msg)

codes = {200: "updated", 201: "created", 204: "updated"}
trymzet marked this conversation as resolved.
Show resolved Hide resolved

if response not in codes:
msg = f"Upsert failed for record: \n{record} with response {response}"
if raise_on_error:
raise ValueError(msg)
else:
self.logger.warning(msg)
else:
self.logger.info(f"Successfully {codes[response]} record {merge_key}.")

self.logger.info(
f"Successfully upserted {len(records)} records into table '{table}'."
)

def bulk_upsert(
self,
df: pd.DataFrame,
table: str,
external_id: str = None,
batch_size: int = 10000,
raise_on_error: bool = False,
) -> None:

trymzet marked this conversation as resolved.
Show resolved Hide resolved
if df.empty:
self.logger.info("No data to upsert.")
return

if external_id and external_id not in df.columns:
raise ValueError(
f"Passed DataFrame does not contain column '{external_id}'."
)
records = df.to_dict("records")
response = 0
trymzet marked this conversation as resolved.
Show resolved Hide resolved
try:
response = self.salesforce.bulk.__getattr__(table).upsert(
data=records, external_id_field=external_id, batch_size=batch_size
)
except SalesforceMalformedRequest as e:
# Bulk insert didn't work at all.
raise ValueError(f"Upsert of records failed: {e}") from e

self.logger.info(f"Successfully upserted bulk records.")

if any(result.get("success") is not True for result in response):
# Upsert of some individual records failed.
failed_records = [
result for result in response if result.get("success") is not True
]
msg = f"Upsert failed for records {failed_records} with response {response}"
if raise_on_error:
raise ValueError(msg)
else:
self.logger.warning(msg)

self.logger.info(
f"Successfully upserted {len(records)} records into table '{table}'."
)

def download(
self, query: str = None, table: str = None, columns: List[str] = None
) -> List[OrderedDict]:
trymzet marked this conversation as resolved.
Show resolved Hide resolved
if not query:
if columns:
columns_str = ", ".join(columns)
else:
columns_str = "FIELDS(STANDARD)"
query = f"SELECT {columns_str} FROM {table}"
records = self.salesforce.query(query).get("records")
# Take trash out.
trymzet marked this conversation as resolved.
Show resolved Hide resolved
_ = [record.pop("attributes") for record in records]
return records

def to_df(
self,
query: str = None,
table: str = None,
columns: List[str] = None,
) -> pd.DataFrame:

records = self.download(query=query, table=table, columns=columns)

return pd.DataFrame(records)