From 36156479d6754be014bb5986012c62a99bea7289 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Wed, 15 Feb 2023 13:59:20 +0100 Subject: [PATCH 01/37] =?UTF-8?q?=E2=9C=A8=20Migrated=20salesforce=20sourc?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 197 +++++++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 viadot/sources/salesforce.py diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py new file mode 100644 index 000000000..8804a1657 --- /dev/null +++ b/viadot/sources/salesforce.py @@ -0,0 +1,197 @@ +from typing import Any, Dict, List, Literal, OrderedDict + +import pandas as pd +from simple_salesforce import Salesforce as SF +from simple_salesforce.exceptions import SalesforceMalformedRequest +from viadot.config import get_source_credentials +from viadot.exceptions import CredentialError +from viadot.sources.base import Source + + +class Salesforce(Source): + """ + A class for pulling data from theSalesforce. + Args: + domain (str): domain of a connection; defaults to 'test' (sandbox). Can be added only if built-in username/password/security token is provided. + client_id (str): client id to keep the track of API calls. + credentials (dict): credentials to connect with. If not provided, will read from local config file. + env (Literal): environment information, provides information about credential and connection configuration; defaults to 'DEV'. + config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. + ---------- + """ + + def __init__( + self, + *args, + domain: str = "test", + client_id: str = "viadot", + credentials: Dict[str, Any] = None, + env: Literal["DEV", "QA", "PROD"] = "DEV", + config_key: str = None, + **kwargs, + ): + credentials = credentials or get_source_credentials(config_key) or {} + + if credentials is None: + raise CredentialError("Please specify the credentials.") + + super().__init__(*args, credentials=credentials, **kwargs) + + if env.upper() == "DEV": + self.salesforce = SF( + username=self.credentials.get("username"), + password=self.credentials.get("password"), + security_token=self.credentials.get("token"), + domain=domain, + client_id=client_id, + ) + elif env.upper() == "QA": + self.salesforce = SF( + username=self.credentials.get("username"), + password=self.credentials.get("password"), + security_token=self.credentials.get("token"), + domain=domain, + client_id=client_id, + ) + elif env.upper() == "PROD": + self.salesforce = SF( + username=self.credentials.get("username"), + password=self.credentials.get("password"), + security_token=self.credentials.get("token"), + domain=domain, + client_id=client_id, + ) + else: + raise ValueError("The only available environments are DEV, QA, and PROD.") + + def upsert( + self, + df: pd.DataFrame, + table: str, + external_id: str = None, + raise_on_error: bool = False, + ) -> None: + + if df.empty: + self.logger.info("No data to upsert.") + return + + if external_id and external_id not in df.columns: + raise ValueError( + f"Passed DataFrame does not contain column '{external_id}'." + ) + + table_to_upsert = getattr(self.salesforce, table) + records = df.to_dict("records") + records_cp = records.copy() + + for record in records_cp: + response = 0 + if external_id: + if record[external_id] is None: + continue + else: + merge_key = f"{external_id}/{record[external_id]}" + record.pop(external_id) + else: + merge_key = record.pop("Id") + + try: + response = table_to_upsert.upsert(data=record, record_id=merge_key) + except SalesforceMalformedRequest as e: + msg = f"Upsert of record {merge_key} failed." + if raise_on_error: + raise ValueError(msg) from e + else: + self.logger.warning(msg) + + codes = {200: "updated", 201: "created", 204: "updated"} + + if response not in codes: + msg = f"Upsert failed for record: \n{record} with response {response}" + if raise_on_error: + raise ValueError(msg) + else: + self.logger.warning(msg) + else: + self.logger.info(f"Successfully {codes[response]} record {merge_key}.") + + self.logger.info( + f"Successfully upserted {len(records)} records into table '{table}'." + ) + + def bulk_upsert( + self, + df: pd.DataFrame, + table: str, + external_id: str = None, + batch_size: int = 10000, + raise_on_error: bool = False, + ) -> None: + + if df.empty: + self.logger.info("No data to upsert.") + return + + if external_id and external_id not in df.columns: + raise ValueError( + f"Passed DataFrame does not contain column '{external_id}'." + ) + records = df.to_dict("records") + response = 0 + try: + response = self.salesforce.bulk.__getattr__(table).upsert( + data=records, external_id_field=external_id, batch_size=batch_size + ) + except SalesforceMalformedRequest as e: + # Bulk insert didn't work at all. + raise ValueError(f"Upsert of records failed: {e}") from e + + self.logger.info(f"Successfully upserted bulk records.") + + if any(result.get("success") is not True for result in response): + # Upsert of some individual records failed. + failed_records = [ + result for result in response if result.get("success") is not True + ] + msg = f"Upsert failed for records {failed_records} with response {response}" + if raise_on_error: + raise ValueError(msg) + else: + self.logger.warning(msg) + + self.logger.info( + f"Successfully upserted {len(records)} records into table '{table}'." + ) + + def download( + self, query: str = None, table: str = None, columns: List[str] = None + ) -> List[OrderedDict]: + if not query: + if columns: + columns_str = ", ".join(columns) + else: + columns_str = "FIELDS(STANDARD)" + query = f"SELECT {columns_str} FROM {table}" + records = self.salesforce.query(query).get("records") + # Take trash out. + _ = [record.pop("attributes") for record in records] + return records + + def to_df( + self, + query: str = None, + table: str = None, + columns: List[str] = None, + if_empty: str = None, + ) -> pd.DataFrame: + # TODO: handle if_empty, add typing (should be Literal) + records = self.download(query=query, table=table, columns=columns) + + if not records: + raise ValueError(f"Query produced no data.") + + return pd.DataFrame(records) + + +Salesforce() From 66d90639b4e911eb67b000bb16e2fcad6df8c73a Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Wed, 15 Feb 2023 14:37:02 +0100 Subject: [PATCH 02/37] =?UTF-8?q?=F0=9F=94=A5=20Removed=20object=20instanc?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 8804a1657..b4ea1f703 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -30,6 +30,7 @@ def __init__( config_key: str = None, **kwargs, ): + credentials = credentials or get_source_credentials(config_key) or {} if credentials is None: @@ -192,6 +193,3 @@ def to_df( raise ValueError(f"Query produced no data.") return pd.DataFrame(records) - - -Salesforce() From 7951428d402d85ce37fc4486ebe29bee3072124e Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Wed, 15 Feb 2023 17:04:36 +0100 Subject: [PATCH 03/37] =?UTF-8?q?=E2=9C=A8=20Added=20slaesforce=20source?= =?UTF-8?q?=20to=20=5F=5Finit=5F=5F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + viadot/sources/__init__.py | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ee242db2..0b0c67877 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `ExchangeRates` source to the library. - Added `from_df()` method to `Azure Data Lake` source - Added `SAPRFC` source to the library. +- Added `Salesforce` source to the library. ### Changed - Added `SQLServerToDF` task diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 4ff03f3fa..994a5451e 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -11,3 +11,4 @@ from .s3 import S3 from .sharepoint import Sharepoint from .redshift_spectrum import RedshiftSpectrum +from .salesforce import Salesforce From d08d07f9b811649fc4ff550b496be668dce22e9b Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Tue, 21 Feb 2023 15:10:55 +0100 Subject: [PATCH 04/37] =?UTF-8?q?=E2=9C=85=20Added=20unit=20tests=20to=20s?= =?UTF-8?q?alesfource=20source?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 62 +++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/unit/test_salesforce.py diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py new file mode 100644 index 000000000..0a716773d --- /dev/null +++ b/tests/unit/test_salesforce.py @@ -0,0 +1,62 @@ +import pandas as pd +import pytest +from viadot.sources import Salesforce + +OBJECT_NAME = "TestObject__c" +ROW_NAME = "Test_row_1" +UPSERTED_ROW = "Test_upser" + + +@pytest.fixture(scope="session", autouse=True) +def setting_up_test_environment(): + + sf = Salesforce(config_key="sales-force") + sf_api = sf.salesforce + + mdapi = sf_api.mdapi + + custom_object = mdapi.CustomObject( + fullName=OBJECT_NAME, + label="Test Object", + pluralLabel="Custom Objects", + nameField=mdapi.CustomField(label="Name", type=mdapi.FieldType("Text")), + deploymentStatus=mdapi.DeploymentStatus("Deployed"), + sharingModel=mdapi.SharingModel("ReadWrite"), + ) + + mdapi.CustomObject.create(custom_object) + sf_api.TestObject__c.create({"Name": ROW_NAME}) + + yield + + mdapi.CustomObject.delete(OBJECT_NAME) + + +def test_to_df_selected_table(setting_up_test_environment): + + sf = Salesforce(config_key="sales-force") + df = sf.to_df(table=OBJECT_NAME) + assert df["Name"][0] == ROW_NAME + assert len(df.axes[1]) == 10 + + +def test_to_df_selected_table_query(setting_up_test_environment): + sf = Salesforce(config_key="sales-force") + df = sf.to_df(query="SELECT FIELDS(STANDARD) FROM TestObject__c") + assert df["Name"][0] == ROW_NAME + assert len(df.axes[1]) == 10 + + +def test_upsert_existing_row(setting_up_test_environment): + + sf = Salesforce(config_key="sales-force") + df = sf.to_df(query="SELECT Id FROM TestObject__c") + + df1 = pd.DataFrame([{"Id": df["Id"][0], "Name": UPSERTED_ROW}]) + sf = Salesforce(config_key="sales-force") + sf.upsert(df=df1, table=OBJECT_NAME) + + df = sf.to_df(query="SELECT FIELDS(STANDARD) FROM TestObject__c") + + assert df["Name"][0] == UPSERTED_ROW + assert len(df.axes[1]) == 10 From 8b99f6a083e9d9f412046ce94051843de7d78033 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Tue, 21 Feb 2023 15:47:54 +0100 Subject: [PATCH 05/37] =?UTF-8?q?=E2=9C=85=20Updated=20test=20structure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 70 ++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 0a716773d..805e3bf39 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -3,8 +3,10 @@ from viadot.sources import Salesforce OBJECT_NAME = "TestObject__c" -ROW_NAME = "Test_row_1" -UPSERTED_ROW = "Test_upser" +ROW_NAME_1 = "Test_row_1" +ROW_NAME_2 = "Test_row_2" +UPSERTED_ROW_1 = "Test_upser_1" +UPSERTED_ROW_2 = "Test_upser_2" @pytest.fixture(scope="session", autouse=True) @@ -25,7 +27,8 @@ def setting_up_test_environment(): ) mdapi.CustomObject.create(custom_object) - sf_api.TestObject__c.create({"Name": ROW_NAME}) + sf_api.TestObject__c.create({"Name": ROW_NAME_1}) + sf_api.TestObject__c.create({"Name": ROW_NAME_2}) yield @@ -36,27 +39,74 @@ def test_to_df_selected_table(setting_up_test_environment): sf = Salesforce(config_key="sales-force") df = sf.to_df(table=OBJECT_NAME) - assert df["Name"][0] == ROW_NAME + print(df) + assert df["Name"][0] == ROW_NAME_1 + assert df["Name"][1] == ROW_NAME_2 assert len(df.axes[1]) == 10 def test_to_df_selected_table_query(setting_up_test_environment): sf = Salesforce(config_key="sales-force") - df = sf.to_df(query="SELECT FIELDS(STANDARD) FROM TestObject__c") - assert df["Name"][0] == ROW_NAME + df = sf.to_df(query=f"SELECT FIELDS(STANDARD) FROM {OBJECT_NAME}") + assert df["Name"][0] == ROW_NAME_1 + assert df["Name"][1] == ROW_NAME_2 assert len(df.axes[1]) == 10 def test_upsert_existing_row(setting_up_test_environment): sf = Salesforce(config_key="sales-force") - df = sf.to_df(query="SELECT Id FROM TestObject__c") + df = sf.to_df(query=f"SELECT Id FROM {OBJECT_NAME}") - df1 = pd.DataFrame([{"Id": df["Id"][0], "Name": UPSERTED_ROW}]) + df1 = pd.DataFrame([{"Id": df["Id"][0], "Name": UPSERTED_ROW_1}]) sf = Salesforce(config_key="sales-force") sf.upsert(df=df1, table=OBJECT_NAME) - df = sf.to_df(query="SELECT FIELDS(STANDARD) FROM TestObject__c") + df = sf.to_df(query=f"SELECT FIELDS(STANDARD) FROM {OBJECT_NAME}") - assert df["Name"][0] == UPSERTED_ROW + assert df["Name"][0] == UPSERTED_ROW_1 assert len(df.axes[1]) == 10 + + +def test_bulk_upsert_existing_rows(): + + sf = Salesforce(config_key="sales-force") + df = sf.to_df(query=f"SELECT Id FROM {OBJECT_NAME}") + + df1 = pd.DataFrame( + [ + {"Id": df["Id"][0], "Name": UPSERTED_ROW_1}, + {"Id": df["Id"][1], "Name": UPSERTED_ROW_2}, + ] + ) + sf = Salesforce(config_key="sales-force") + sf.upsert(df=df1, table=OBJECT_NAME) + + df = sf.to_df(query=f"SELECT FIELDS(STANDARD) FROM {OBJECT_NAME}") + print(df) + assert df["Name"][1] == UPSERTED_ROW_2 + assert len(df.axes[1]) == 10 + + +""" +sf = Salesforce(config_key="sales-force") +sf_api = sf.salesforce + +mdapi = sf_api.mdapi + +custom_object = mdapi.CustomObject( + fullName=OBJECT_NAME, + label="Test Object", + pluralLabel="Custom Objects", + nameField=mdapi.CustomField(label="Name", type=mdapi.FieldType("Text")), + deploymentStatus=mdapi.DeploymentStatus("Deployed"), + sharingModel=mdapi.SharingModel("ReadWrite"), +) +mdapi.CustomObject.delete(OBJECT_NAME) +# mdapi.CustomObject.create(custom_object) +# sf_api.TestObject__c.create({"Name": ROW_NAME_1}) +# sf_api.TestObject__c.create({"Name": ROW_NAME_2}) + +# df = sf.to_df(table=OBJECT_NAME) +# print(df) +""" From 773797d11b74fa306db8cd99aa532391229328ad Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 23 Feb 2023 10:15:24 +0100 Subject: [PATCH 06/37] =?UTF-8?q?=E2=9C=85=20Migrated=20unit=20tests=20fro?= =?UTF-8?q?m=20viadot=201.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 148 ++++++++++++---------------------- viadot/sources/salesforce.py | 3 +- 2 files changed, 52 insertions(+), 99 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 805e3bf39..6b0997164 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -1,112 +1,64 @@ import pandas as pd import pytest -from viadot.sources import Salesforce - -OBJECT_NAME = "TestObject__c" -ROW_NAME_1 = "Test_row_1" -ROW_NAME_2 = "Test_row_2" -UPSERTED_ROW_1 = "Test_upser_1" -UPSERTED_ROW_2 = "Test_upser_2" - - -@pytest.fixture(scope="session", autouse=True) -def setting_up_test_environment(): - sf = Salesforce(config_key="sales-force") - sf_api = sf.salesforce +from viadot.sources import Salesforce - mdapi = sf_api.mdapi - custom_object = mdapi.CustomObject( - fullName=OBJECT_NAME, - label="Test Object", - pluralLabel="Custom Objects", - nameField=mdapi.CustomField(label="Name", type=mdapi.FieldType("Text")), - deploymentStatus=mdapi.DeploymentStatus("Deployed"), - sharingModel=mdapi.SharingModel("ReadWrite"), +@pytest.fixture(scope="session") +def salesforce(): + s = Salesforce(config_key="sales-force") + yield s + + +@pytest.fixture(scope="session") +def test_df_external(): + data = { + "Id": ["111"], + "LastName": ["John Tester-External"], + "SAPContactId__c": ["112"], + } + df = pd.DataFrame(data=data) + yield df + + +def test_upsert_empty(salesforce): + try: + df = pd.DataFrame() + salesforce.upsert(df=df, table="Contact") + except Exception as exception: + assert False, exception + + +def test_upsert_external_id_correct(salesforce, test_df_external): + try: + salesforce.upsert( + df=test_df_external, table="Contact", external_id="SAPContactId__c" + ) + except Exception as exception: + assert False, exception + result = salesforce.download(table="Contact") + exists = list( + filter(lambda contact: contact["LastName"] == "John Tester-External", result) ) - - mdapi.CustomObject.create(custom_object) - sf_api.TestObject__c.create({"Name": ROW_NAME_1}) - sf_api.TestObject__c.create({"Name": ROW_NAME_2}) - - yield - - mdapi.CustomObject.delete(OBJECT_NAME) + assert exists != None -def test_to_df_selected_table(setting_up_test_environment): +def test_upsert_external_id_wrong(salesforce, test_df_external): + with pytest.raises(ValueError): + salesforce.upsert(df=test_df_external, table="Contact", external_id="SAPId") - sf = Salesforce(config_key="sales-force") - df = sf.to_df(table=OBJECT_NAME) - print(df) - assert df["Name"][0] == ROW_NAME_1 - assert df["Name"][1] == ROW_NAME_2 - assert len(df.axes[1]) == 10 +def test_download_no_query(salesforce): + ordered_dict = salesforce.download(table="Account") + assert len(ordered_dict) > 0 -def test_to_df_selected_table_query(setting_up_test_environment): - sf = Salesforce(config_key="sales-force") - df = sf.to_df(query=f"SELECT FIELDS(STANDARD) FROM {OBJECT_NAME}") - assert df["Name"][0] == ROW_NAME_1 - assert df["Name"][1] == ROW_NAME_2 - assert len(df.axes[1]) == 10 +def test_download_with_query(salesforce): + query = "SELECT Id, Name FROM Account" + ordered_dict = salesforce.download(query=query) + assert len(ordered_dict) > 0 -def test_upsert_existing_row(setting_up_test_environment): - sf = Salesforce(config_key="sales-force") - df = sf.to_df(query=f"SELECT Id FROM {OBJECT_NAME}") - - df1 = pd.DataFrame([{"Id": df["Id"][0], "Name": UPSERTED_ROW_1}]) - sf = Salesforce(config_key="sales-force") - sf.upsert(df=df1, table=OBJECT_NAME) - - df = sf.to_df(query=f"SELECT FIELDS(STANDARD) FROM {OBJECT_NAME}") - - assert df["Name"][0] == UPSERTED_ROW_1 - assert len(df.axes[1]) == 10 - - -def test_bulk_upsert_existing_rows(): - - sf = Salesforce(config_key="sales-force") - df = sf.to_df(query=f"SELECT Id FROM {OBJECT_NAME}") - - df1 = pd.DataFrame( - [ - {"Id": df["Id"][0], "Name": UPSERTED_ROW_1}, - {"Id": df["Id"][1], "Name": UPSERTED_ROW_2}, - ] - ) - sf = Salesforce(config_key="sales-force") - sf.upsert(df=df1, table=OBJECT_NAME) - - df = sf.to_df(query=f"SELECT FIELDS(STANDARD) FROM {OBJECT_NAME}") - print(df) - assert df["Name"][1] == UPSERTED_ROW_2 - assert len(df.axes[1]) == 10 - - -""" -sf = Salesforce(config_key="sales-force") -sf_api = sf.salesforce - -mdapi = sf_api.mdapi - -custom_object = mdapi.CustomObject( - fullName=OBJECT_NAME, - label="Test Object", - pluralLabel="Custom Objects", - nameField=mdapi.CustomField(label="Name", type=mdapi.FieldType("Text")), - deploymentStatus=mdapi.DeploymentStatus("Deployed"), - sharingModel=mdapi.SharingModel("ReadWrite"), -) -mdapi.CustomObject.delete(OBJECT_NAME) -# mdapi.CustomObject.create(custom_object) -# sf_api.TestObject__c.create({"Name": ROW_NAME_1}) -# sf_api.TestObject__c.create({"Name": ROW_NAME_2}) - -# df = sf.to_df(table=OBJECT_NAME) -# print(df) -""" +def test_to_df(salesforce): + df = salesforce.to_df(table="Account") + assert df.empty == False diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index b4ea1f703..8c6c13e6b 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -11,13 +11,14 @@ class Salesforce(Source): """ A class for pulling data from theSalesforce. + Args: domain (str): domain of a connection; defaults to 'test' (sandbox). Can be added only if built-in username/password/security token is provided. client_id (str): client id to keep the track of API calls. credentials (dict): credentials to connect with. If not provided, will read from local config file. env (Literal): environment information, provides information about credential and connection configuration; defaults to 'DEV'. config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. - ---------- + """ def __init__( From 7fc3688c262f7c94e472c89113835d8a8a8bb82e Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 23 Feb 2023 10:22:10 +0100 Subject: [PATCH 07/37] =?UTF-8?q?=F0=9F=94=90=20Updated=20name=20of=20sale?= =?UTF-8?q?sforce=20credentials?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 6b0997164..57438ac3e 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -6,7 +6,7 @@ @pytest.fixture(scope="session") def salesforce(): - s = Salesforce(config_key="sales-force") + s = Salesforce(config_key="sales_force_dev") yield s From ee468c9e6eb7e98399d15df60bbfefb2b2dcc0dd Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 2 Mar 2023 11:59:26 +0000 Subject: [PATCH 08/37] =?UTF-8?q?=F0=9F=94=A5=20Removed=20if=5Fempty=20par?= =?UTF-8?q?ameter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 8c6c13e6b..309215a6c 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -185,12 +185,8 @@ def to_df( query: str = None, table: str = None, columns: List[str] = None, - if_empty: str = None, ) -> pd.DataFrame: - # TODO: handle if_empty, add typing (should be Literal) - records = self.download(query=query, table=table, columns=columns) - if not records: - raise ValueError(f"Query produced no data.") + records = self.download(query=query, table=table, columns=columns) return pd.DataFrame(records) From 4046167f9136fad6ee76e3085df9f74909ca0925 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 2 Mar 2023 12:10:20 +0000 Subject: [PATCH 09/37] =?UTF-8?q?=E2=9C=85=20Updated=20unit=20tests=20stru?= =?UTF-8?q?cture?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 65 +++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 57438ac3e..970be6228 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -3,6 +3,11 @@ from viadot.sources import Salesforce +TABLE_TO_DOWNLOAD = "Account" +TABLE_TO_UPSERT = "Contact" +TEST_LAST_NAME = "prefect-viadot-test" +ID_TO_UPSERT = "0035E00001YGWK3QAP" + @pytest.fixture(scope="session") def salesforce(): @@ -11,20 +16,44 @@ def salesforce(): @pytest.fixture(scope="session") -def test_df_external(): +def test_df_data(salesforce): + data = { + "Id": [ID_TO_UPSERT], + "LastName": [TEST_LAST_NAME], + } + df = pd.DataFrame(data=data) + + yield df + + data_restored = { + "Id": [ID_TO_UPSERT], + "LastName": ["LastName"], + } + df_restored = pd.DataFrame(data=data_restored) + salesforce.upsert(df=df_restored, table=TABLE_TO_UPSERT) + + +@pytest.fixture(scope="session") +def test_df_external(salesforce): data = { - "Id": ["111"], - "LastName": ["John Tester-External"], - "SAPContactId__c": ["112"], + "LastName": [TEST_LAST_NAME], + "SAPContactId__c": ["111"], } df = pd.DataFrame(data=data) yield df + data_restored = { + "Id": [ID_TO_UPSERT], + "LastName": ["LastName"], + } + df_restored = pd.DataFrame(data=data_restored) + salesforce.upsert(df=df_restored, table=TABLE_TO_UPSERT) + def test_upsert_empty(salesforce): try: df = pd.DataFrame() - salesforce.upsert(df=df, table="Contact") + salesforce.upsert(df=df, table=TABLE_TO_UPSERT) except Exception as exception: assert False, exception @@ -32,33 +61,41 @@ def test_upsert_empty(salesforce): def test_upsert_external_id_correct(salesforce, test_df_external): try: salesforce.upsert( - df=test_df_external, table="Contact", external_id="SAPContactId__c" + df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPContactId__c" ) except Exception as exception: assert False, exception - result = salesforce.download(table="Contact") - exists = list( - filter(lambda contact: contact["LastName"] == "John Tester-External", result) + df = salesforce.to_df( + query=f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE LastName='{TEST_LAST_NAME}'" ) - assert exists != None + + result = df.values + assert result[0][0] == ID_TO_UPSERT + assert result[0][1] == TEST_LAST_NAME def test_upsert_external_id_wrong(salesforce, test_df_external): with pytest.raises(ValueError): - salesforce.upsert(df=test_df_external, table="Contact", external_id="SAPId") + salesforce.upsert( + df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPId" + ) def test_download_no_query(salesforce): - ordered_dict = salesforce.download(table="Account") + ordered_dict = salesforce.download(table=TABLE_TO_DOWNLOAD) assert len(ordered_dict) > 0 def test_download_with_query(salesforce): - query = "SELECT Id, Name FROM Account" + query = f"SELECT Id, Name FROM {TABLE_TO_DOWNLOAD}" ordered_dict = salesforce.download(query=query) assert len(ordered_dict) > 0 def test_to_df(salesforce): - df = salesforce.to_df(table="Account") + df = salesforce.to_df(table=TABLE_TO_DOWNLOAD) assert df.empty == False + + +def test_upsert(salesforce, test_df_data): + salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT) From bf79a66a06f490d315e823857fb35708621e2114 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 2 Mar 2023 13:22:24 +0000 Subject: [PATCH 10/37] =?UTF-8?q?=F0=9F=93=9D=20Updated=20main=20class=20d?= =?UTF-8?q?ocstring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 309215a6c..bf2a6689b 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -10,15 +10,19 @@ class Salesforce(Source): """ - A class for pulling data from theSalesforce. + A class for downloading and upserting data from Salesforce. Args: - domain (str): domain of a connection; defaults to 'test' (sandbox). Can be added only if built-in username/password/security token is provided. - client_id (str): client id to keep the track of API calls. - credentials (dict): credentials to connect with. If not provided, will read from local config file. - env (Literal): environment information, provides information about credential and connection configuration; defaults to 'DEV'. - config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. - + domain (str, optional): Domain of a connection. Defaults to 'test' (sandbox). + Can be added only if built-in username/password/security token is provided. + client_id (str, optional): Client id to keep the track of API calls. + Defaults to 'viadot'. + credentials (Dict[str, Any], optional): Credentials to connect with Salesforce. + If not provided, will read from local config file. Defaults to None. + env (Literal["DEV", "QA", "PROD"], optional): Environment information, provides information + about credential and connection configuration. Defaults to 'DEV'. + config_key (str, optional): The key in the viadot config holding relevant credentials. + Defaults to None. """ def __init__( From a3fea0e7499287c554de2babe2f6367de9a8f058 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 2 Mar 2023 14:57:29 +0000 Subject: [PATCH 11/37] =?UTF-8?q?=F0=9F=93=9D=20Added=20docstring=20to=20S?= =?UTF-8?q?lesforce=20class=20functions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 49 +++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index bf2a6689b..b7ec61674 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -77,7 +77,16 @@ def upsert( external_id: str = None, raise_on_error: bool = False, ) -> None: - + """ + Performs upsert operations on the selected row in the table. + + Args: + df (pd.DataFrame): The DataFrame to upsert. Only a single row can be upserted with this function. + table (str): The table where the data should be upserted. + external_id (str, optional): The external ID to use for the upsert. Defaults to None. + raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails. + If False, we only display a warning. Defaults to False. + """ if df.empty: self.logger.info("No data to upsert.") return @@ -134,7 +143,18 @@ def bulk_upsert( batch_size: int = 10000, raise_on_error: bool = False, ) -> None: - + """ + Performs upsert operations on multiple rows in a table. + + Args: + df (pd.DataFrame): The DataFrame to upsert. + table (str): The table where the data should be upserted. + external_id (str, optional): The external ID to use for the upsert. Defaults to None. + batch_size (int, optional): Number of records to be included in each batch of records + that are sent to the Salesforce API for processing. Defaults to 10000. + raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails. + If False, we only display a warning. Defaults to False. + """ if df.empty: self.logger.info("No data to upsert.") return @@ -173,6 +193,18 @@ def bulk_upsert( def download( self, query: str = None, table: str = None, columns: List[str] = None ) -> List[OrderedDict]: + """ + Dowload all data from the indicated table or the result of the specified query. + + Args: + query (str, optional): Query for download the specific data. Defaults to None. + table (str, optional): Table name. Defaults to None. + columns (List[str], optional): List of columns which are needed, + requires table argument. Defaults to None. + + Returns: + List[OrderedDict]: Selected rows from Salesforce. + """ if not query: if columns: columns_str = ", ".join(columns) @@ -190,7 +222,18 @@ def to_df( table: str = None, columns: List[str] = None, ) -> pd.DataFrame: - + """ + Converts the List returned by the download functions to a DataFrame. + + Args: + query (str, optional): Query for download the specific data. Defaults to None. + table (str, optional): Table name. Defaults to None. + columns (List[str], optional): List of columns which are needed, + requires table argument. Defaults to None. + + Returns: + pd.DataFrame: Selected rows from Salesforce. + """ records = self.download(query=query, table=table, columns=columns) return pd.DataFrame(records) From f976ecd5af528363e8ee21306e2fbe4c824d91be Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 2 Mar 2023 15:19:11 +0000 Subject: [PATCH 12/37] =?UTF-8?q?=F0=9F=8E=A8=20Changed=20name=20of=20'cod?= =?UTF-8?q?e'=20var=20to=20'valid=5Fresponse=5Fcode'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index b7ec61674..95ff94d21 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -120,16 +120,18 @@ def upsert( else: self.logger.warning(msg) - codes = {200: "updated", 201: "created", 204: "updated"} + valid_response_codes = {200: "updated", 201: "created", 204: "updated"} - if response not in codes: + if response not in valid_response_codes: msg = f"Upsert failed for record: \n{record} with response {response}" if raise_on_error: raise ValueError(msg) else: self.logger.warning(msg) else: - self.logger.info(f"Successfully {codes[response]} record {merge_key}.") + self.logger.info( + f"Successfully {valid_response_codes[response]} record {merge_key}." + ) self.logger.info( f"Successfully upserted {len(records)} records into table '{table}'." From f5deb9aba7c1912a86f9d4087fab1ebe475a3e0b Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 2 Mar 2023 15:26:58 +0000 Subject: [PATCH 13/37] =?UTF-8?q?=F0=9F=8E=A8=20Changed=20name=20of=20'res?= =?UTF-8?q?ponse'=20var=20to=20'response=5Fcode'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 95ff94d21..cca51ef87 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -101,7 +101,7 @@ def upsert( records_cp = records.copy() for record in records_cp: - response = 0 + response_code = 0 if external_id: if record[external_id] is None: continue @@ -112,7 +112,7 @@ def upsert( merge_key = record.pop("Id") try: - response = table_to_upsert.upsert(data=record, record_id=merge_key) + response_code = table_to_upsert.upsert(data=record, record_id=merge_key) except SalesforceMalformedRequest as e: msg = f"Upsert of record {merge_key} failed." if raise_on_error: @@ -122,15 +122,15 @@ def upsert( valid_response_codes = {200: "updated", 201: "created", 204: "updated"} - if response not in valid_response_codes: - msg = f"Upsert failed for record: \n{record} with response {response}" + if response_code not in valid_response_codes: + msg = f"Upsert failed for record: \n{record} with response code {response_code }" if raise_on_error: raise ValueError(msg) else: self.logger.warning(msg) else: self.logger.info( - f"Successfully {valid_response_codes[response]} record {merge_key}." + f"Successfully {valid_response_codes[response_code]} record {merge_key}." ) self.logger.info( @@ -166,9 +166,9 @@ def bulk_upsert( f"Passed DataFrame does not contain column '{external_id}'." ) records = df.to_dict("records") - response = 0 + response_code = 0 try: - response = self.salesforce.bulk.__getattr__(table).upsert( + response_code = self.salesforce.bulk.__getattr__(table).upsert( data=records, external_id_field=external_id, batch_size=batch_size ) except SalesforceMalformedRequest as e: @@ -177,12 +177,12 @@ def bulk_upsert( self.logger.info(f"Successfully upserted bulk records.") - if any(result.get("success") is not True for result in response): + if any(result.get("success") is not True for result in response_code): # Upsert of some individual records failed. failed_records = [ - result for result in response if result.get("success") is not True + result for result in response_code if result.get("success") is not True ] - msg = f"Upsert failed for records {failed_records} with response {response}" + msg = f"Upsert failed for records {failed_records} with response code {response_code}" if raise_on_error: raise ValueError(msg) else: From ddee24f7af437d9fbf215a060ce8e8f6cbc04fc8 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Fri, 3 Mar 2023 10:59:55 +0000 Subject: [PATCH 14/37] =?UTF-8?q?=F0=9F=94=A5=20Removed=20response=5Fcode?= =?UTF-8?q?=20=20var=20declaration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index cca51ef87..d6f76009a 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -101,7 +101,7 @@ def upsert( records_cp = records.copy() for record in records_cp: - response_code = 0 + if external_id: if record[external_id] is None: continue @@ -166,7 +166,7 @@ def bulk_upsert( f"Passed DataFrame does not contain column '{external_id}'." ) records = df.to_dict("records") - response_code = 0 + try: response_code = self.salesforce.bulk.__getattr__(table).upsert( data=records, external_id_field=external_id, batch_size=batch_size From 0e07e8d063e90c551ae2fb71abf548bf9d853e56 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 09:25:59 +0000 Subject: [PATCH 15/37] =?UTF-8?q?=F0=9F=8E=A8=20Improved=20connetion=20wit?= =?UTF-8?q?h=20env?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index d6f76009a..f635ca791 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -17,10 +17,10 @@ class Salesforce(Source): Can be added only if built-in username/password/security token is provided. client_id (str, optional): Client id to keep the track of API calls. Defaults to 'viadot'. - credentials (Dict[str, Any], optional): Credentials to connect with Salesforce. - If not provided, will read from local config file. Defaults to None. env (Literal["DEV", "QA", "PROD"], optional): Environment information, provides information about credential and connection configuration. Defaults to 'DEV'. + credentials (Dict[str, Any], optional): Credentials to connect with Salesforce. + If not provided, will read from local config file. Defaults to None. config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. """ @@ -30,8 +30,8 @@ def __init__( *args, domain: str = "test", client_id: str = "viadot", - credentials: Dict[str, Any] = None, env: Literal["DEV", "QA", "PROD"] = "DEV", + credentials: Dict[str, Any] = None, config_key: str = None, **kwargs, ): @@ -43,15 +43,7 @@ def __init__( super().__init__(*args, credentials=credentials, **kwargs) - if env.upper() == "DEV": - self.salesforce = SF( - username=self.credentials.get("username"), - password=self.credentials.get("password"), - security_token=self.credentials.get("token"), - domain=domain, - client_id=client_id, - ) - elif env.upper() == "QA": + if env.upper() == "DEV" or env.upper() == "QA": self.salesforce = SF( username=self.credentials.get("username"), password=self.credentials.get("password"), @@ -59,14 +51,14 @@ def __init__( domain=domain, client_id=client_id, ) + elif env.upper() == "PROD": self.salesforce = SF( username=self.credentials.get("username"), password=self.credentials.get("password"), security_token=self.credentials.get("token"), - domain=domain, - client_id=client_id, ) + else: raise ValueError("The only available environments are DEV, QA, and PROD.") From 3aaad0644e57431a62871c08bc0f326d963fda7c Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 10:35:06 +0000 Subject: [PATCH 16/37] =?UTF-8?q?=F0=9F=92=A1=20Updated=20comment=20in=20d?= =?UTF-8?q?ownload=20fuction?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index f635ca791..826648077 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -206,7 +206,7 @@ def download( columns_str = "FIELDS(STANDARD)" query = f"SELECT {columns_str} FROM {table}" records = self.salesforce.query(query).get("records") - # Take trash out. + # Remove metadata from the data _ = [record.pop("attributes") for record in records] return records From d0f0416fedbabaa7423774ee910616a6aec306a6 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 10:37:49 +0000 Subject: [PATCH 17/37] =?UTF-8?q?=F0=9F=94=90=20Updated=20config=5Fkey=20n?= =?UTF-8?q?ames?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 970be6228..de417f5bd 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -11,7 +11,7 @@ @pytest.fixture(scope="session") def salesforce(): - s = Salesforce(config_key="sales_force_dev") + s = Salesforce(config_key="salesforce_dev") yield s @@ -99,3 +99,12 @@ def test_to_df(salesforce): def test_upsert(salesforce, test_df_data): salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT) + + +import pdb + +# pdb.set_trace() +sf = Salesforce(config_key="salesforce_qa") +print(sf.salesforce) +df = sf.to_df(table="Contact") +print(df) From 63913e058b3950a409f94b561780501210fccf1b Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 10:39:18 +0000 Subject: [PATCH 18/37] =?UTF-8?q?Revert=20"=F0=9F=94=90=20Updated=20config?= =?UTF-8?q?=5Fkey=20names"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit d0f0416fedbabaa7423774ee910616a6aec306a6. --- tests/unit/test_salesforce.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index de417f5bd..970be6228 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -11,7 +11,7 @@ @pytest.fixture(scope="session") def salesforce(): - s = Salesforce(config_key="salesforce_dev") + s = Salesforce(config_key="sales_force_dev") yield s @@ -99,12 +99,3 @@ def test_to_df(salesforce): def test_upsert(salesforce, test_df_data): salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT) - - -import pdb - -# pdb.set_trace() -sf = Salesforce(config_key="salesforce_qa") -print(sf.salesforce) -df = sf.to_df(table="Contact") -print(df) From a8fa33590c9a266a5ac5f9099b6ffed78e700b08 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 10:42:34 +0000 Subject: [PATCH 19/37] =?UTF-8?q?=F0=9F=94=90=20Updated=20config=5Fkey=20n?= =?UTF-8?q?ame=20after=20git=20revert?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 970be6228..e13015867 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -11,7 +11,7 @@ @pytest.fixture(scope="session") def salesforce(): - s = Salesforce(config_key="sales_force_dev") + s = Salesforce(config_key="salesforce_dev") yield s From b408db7f60c099bc43421eaf070158d13e2a1f8d Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 11:49:25 +0000 Subject: [PATCH 20/37] =?UTF-8?q?=E2=9C=85=20Improved=20cleaning=20after?= =?UTF-8?q?=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index e13015867..d30b43083 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -1,6 +1,5 @@ import pandas as pd import pytest - from viadot.sources import Salesforce TABLE_TO_DOWNLOAD = "Account" @@ -25,12 +24,8 @@ def test_df_data(salesforce): yield df - data_restored = { - "Id": [ID_TO_UPSERT], - "LastName": ["LastName"], - } - df_restored = pd.DataFrame(data=data_restored) - salesforce.upsert(df=df_restored, table=TABLE_TO_UPSERT) + sf = salesforce.salesforce + sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"}) @pytest.fixture(scope="session") @@ -42,12 +37,8 @@ def test_df_external(salesforce): df = pd.DataFrame(data=data) yield df - data_restored = { - "Id": [ID_TO_UPSERT], - "LastName": ["LastName"], - } - df_restored = pd.DataFrame(data=data_restored) - salesforce.upsert(df=df_restored, table=TABLE_TO_UPSERT) + sf = salesforce.salesforce + sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"}) def test_upsert_empty(salesforce): From ab1d1af708b47241f6ebaefa05311edb616c1bcb Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 12:04:28 +0000 Subject: [PATCH 21/37] =?UTF-8?q?=E2=9C=85=20=F0=9F=94=A5=20Removed=20test?= =?UTF-8?q?=20`test=5Fupsert=5Fempty`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index d30b43083..b713294bd 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -41,21 +41,13 @@ def test_df_external(salesforce): sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"}) -def test_upsert_empty(salesforce): - try: - df = pd.DataFrame() - salesforce.upsert(df=df, table=TABLE_TO_UPSERT) - except Exception as exception: - assert False, exception - - def test_upsert_external_id_correct(salesforce, test_df_external): try: salesforce.upsert( df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPContactId__c" ) except Exception as exception: - assert False, exception + raise exception df = salesforce.to_df( query=f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE LastName='{TEST_LAST_NAME}'" ) From de73e4ca66fa277fb67d44c7cd5221f0c9eacc00 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 12:20:08 +0000 Subject: [PATCH 22/37] =?UTF-8?q?=20=E2=9C=85=20=F0=9F=8E=A8Improved=20str?= =?UTF-8?q?ucture=20of=20upsert=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index b713294bd..192b946dc 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -48,13 +48,13 @@ def test_upsert_external_id_correct(salesforce, test_df_external): ) except Exception as exception: raise exception - df = salesforce.to_df( - query=f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE LastName='{TEST_LAST_NAME}'" + + sf = salesforce.salesforce + result = sf.query( + f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE ID='{ID_TO_UPSERT}'" ) - result = df.values - assert result[0][0] == ID_TO_UPSERT - assert result[0][1] == TEST_LAST_NAME + assert result["records"][0]["LastName"] == TEST_LAST_NAME def test_upsert_external_id_wrong(salesforce, test_df_external): From 61bf1610b01be0ae459e7578558c6df73edf96db Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 12:59:39 +0000 Subject: [PATCH 23/37] =?UTF-8?q?=E2=9C=85=F0=9F=8E=A8Improved=20structure?= =?UTF-8?q?=20of=20=20test=5Fto=5Fdf?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 192b946dc..b63552152 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -77,7 +77,10 @@ def test_download_with_query(salesforce): def test_to_df(salesforce): df = salesforce.to_df(table=TABLE_TO_DOWNLOAD) + print(len(df.values)) assert df.empty == False + assert len(df.columns) == 98 + assert len(df.values) >= 1000 def test_upsert(salesforce, test_df_data): From 47e177ecfe466d1a64c45b3db34e765f2b99cfc1 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 14:16:30 +0000 Subject: [PATCH 24/37] =?UTF-8?q?=E2=9C=85=20=F0=9F=8E=A8=20Improved=20str?= =?UTF-8?q?ucture=20of=20`test=5Fupsert`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index b63552152..98e1a00aa 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -4,7 +4,7 @@ TABLE_TO_DOWNLOAD = "Account" TABLE_TO_UPSERT = "Contact" -TEST_LAST_NAME = "prefect-viadot-test" +TEST_LAST_NAME = "viadot-test" ID_TO_UPSERT = "0035E00001YGWK3QAP" @@ -83,5 +83,15 @@ def test_to_df(salesforce): assert len(df.values) >= 1000 -def test_upsert(salesforce, test_df_data): - salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT) +def test_upsert_row_id(salesforce, test_df_data): + try: + salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT) + except Exception as exception: + raise exception + + sf = salesforce.salesforce + result = sf.query( + f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE ID='{ID_TO_UPSERT}'" + ) + + assert result["records"][0]["LastName"] == TEST_LAST_NAME From 3e2ef0856cf130df631664474368f0fd15210bc7 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 14:20:02 +0000 Subject: [PATCH 25/37] =?UTF-8?q?=E2=9C=85=20=F0=9F=8E=A8=20Changed=20name?= =?UTF-8?q?=20of=20variable?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 98e1a00aa..1c11af7d0 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -65,14 +65,14 @@ def test_upsert_external_id_wrong(salesforce, test_df_external): def test_download_no_query(salesforce): - ordered_dict = salesforce.download(table=TABLE_TO_DOWNLOAD) - assert len(ordered_dict) > 0 + records = salesforce.download(table=TABLE_TO_DOWNLOAD) + assert len(records) > 0 def test_download_with_query(salesforce): query = f"SELECT Id, Name FROM {TABLE_TO_DOWNLOAD}" - ordered_dict = salesforce.download(query=query) - assert len(ordered_dict) > 0 + records = salesforce.download(query=query) + assert len(records) > 0 def test_to_df(salesforce): From 97e6dbb9a79abf04b0787c07e8f44830a654fb8f Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 14:48:21 +0000 Subject: [PATCH 26/37] =?UTF-8?q?=F0=9F=8E=A8=20Improved=20retrieving=20ex?= =?UTF-8?q?ternalID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 826648077..411ae019a 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -95,9 +95,8 @@ def upsert( for record in records_cp: if external_id: - if record[external_id] is None: - continue - else: + # If the specified external ID is on the upsert line and has a value, it will be used as merge_key + if record[external_id] is not None: merge_key = f"{external_id}/{record[external_id]}" record.pop(external_id) else: @@ -231,3 +230,25 @@ def to_df( records = self.download(query=query, table=table, columns=columns) return pd.DataFrame(records) + + +TABLE_TO_DOWNLOAD = "Account" +TABLE_TO_UPSERT = "Contact" +TEST_LAST_NAME = "viadot-test" +ID_TO_UPSERT = "0035E00001YGWK3QAP" + +s = Salesforce(config_key="salesforce_dev") +data = { + "LastName": [TEST_LAST_NAME], + "SAPContactId__c": ["111"], +} +df = pd.DataFrame(data=data) +import pdb + +pdb.set_trace() +s.upsert(df=df, table=TABLE_TO_UPSERT, external_id="SAPContactId__c") + +# df = s.to_df(query=f"SELECT ID, LastName FROM Contact WHERE ID='0035E00001YGWK3QAP'") + +# Print the object names +print(df) From 5625cee1776787cbea2faf7ab6a79067fb7491e5 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 6 Mar 2023 14:50:24 +0000 Subject: [PATCH 27/37] =?UTF-8?q?=E2=8F=AA=20Removed=20tests=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 411ae019a..348b97132 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -230,25 +230,3 @@ def to_df( records = self.download(query=query, table=table, columns=columns) return pd.DataFrame(records) - - -TABLE_TO_DOWNLOAD = "Account" -TABLE_TO_UPSERT = "Contact" -TEST_LAST_NAME = "viadot-test" -ID_TO_UPSERT = "0035E00001YGWK3QAP" - -s = Salesforce(config_key="salesforce_dev") -data = { - "LastName": [TEST_LAST_NAME], - "SAPContactId__c": ["111"], -} -df = pd.DataFrame(data=data) -import pdb - -pdb.set_trace() -s.upsert(df=df, table=TABLE_TO_UPSERT, external_id="SAPContactId__c") - -# df = s.to_df(query=f"SELECT ID, LastName FROM Contact WHERE ID='0035E00001YGWK3QAP'") - -# Print the object names -print(df) From 4215dea7f100786defa7dddc4222ca483cc92a48 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 9 Mar 2023 09:14:29 +0000 Subject: [PATCH 28/37] =?UTF-8?q?=E2=9C=85=20Chenged=20testing=20structure?= =?UTF-8?q?=20in=20upsert=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 82 +++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 1c11af7d0..b69acdb54 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -5,7 +5,6 @@ TABLE_TO_DOWNLOAD = "Account" TABLE_TO_UPSERT = "Contact" TEST_LAST_NAME = "viadot-test" -ID_TO_UPSERT = "0035E00001YGWK3QAP" @pytest.fixture(scope="session") @@ -15,53 +14,76 @@ def salesforce(): @pytest.fixture(scope="session") -def test_df_data(salesforce): +def test_row_creation(salesforce): + + # Creating a test row + test_row = {"LastName": "salesforce-test", "SAPContactId__c": "88111"} + sf = salesforce.salesforce + sf.Contact.create(test_row) + + yield + + # Finding the Id of a created row in a table and removing it + result = sf.query(f"SELECT Id FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'") + sf.Contact.delete(result["records"][0]["Id"]) + + +def test_upsert_external_id_correct(salesforce, test_row_creation): + data = { - "Id": [ID_TO_UPSERT], "LastName": [TEST_LAST_NAME], + "SAPContactId__c": ["88111"], } df = pd.DataFrame(data=data) - yield df + try: + salesforce.upsert(df=df, table=TABLE_TO_UPSERT, external_id="SAPContactId__c") + except Exception as exception: + raise exception + + sf = salesforce.salesforce + result = sf.query( + f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'" + ) + + assert result["records"][0]["LastName"] == TEST_LAST_NAME + + +def test_upsert_row_id(salesforce, test_row_creation): sf = salesforce.salesforce - sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"}) + created_row = sf.query( + f"SELECT Id FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'" + ) + created_row_id = created_row["records"][0]["Id"] -@pytest.fixture(scope="session") -def test_df_external(salesforce): data = { + "Id": [created_row_id], "LastName": [TEST_LAST_NAME], - "SAPContactId__c": ["111"], } df = pd.DataFrame(data=data) - yield df - sf = salesforce.salesforce - sf.Contact.update(ID_TO_UPSERT, {"LastName": "LastName"}) - - -def test_upsert_external_id_correct(salesforce, test_df_external): try: - salesforce.upsert( - df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPContactId__c" - ) + salesforce.upsert(df=df, table=TABLE_TO_UPSERT) except Exception as exception: raise exception - sf = salesforce.salesforce result = sf.query( - f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE ID='{ID_TO_UPSERT}'" + f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE Id ='{created_row_id}'" ) assert result["records"][0]["LastName"] == TEST_LAST_NAME -def test_upsert_external_id_wrong(salesforce, test_df_external): +def test_upsert_external_id_wrong(salesforce, test_row_creation): + data = { + "LastName": [TEST_LAST_NAME], + "SAPContactId__c": ["88111"], + } + df = pd.DataFrame(data=data) with pytest.raises(ValueError): - salesforce.upsert( - df=test_df_external, table=TABLE_TO_UPSERT, external_id="SAPId" - ) + salesforce.upsert(df=df, table=TABLE_TO_UPSERT, external_id="SAPId") def test_download_no_query(salesforce): @@ -81,17 +103,3 @@ def test_to_df(salesforce): assert df.empty == False assert len(df.columns) == 98 assert len(df.values) >= 1000 - - -def test_upsert_row_id(salesforce, test_df_data): - try: - salesforce.upsert(df=test_df_data, table=TABLE_TO_UPSERT) - except Exception as exception: - raise exception - - sf = salesforce.salesforce - result = sf.query( - f"SELECT ID, LastName FROM {TABLE_TO_UPSERT} WHERE ID='{ID_TO_UPSERT}'" - ) - - assert result["records"][0]["LastName"] == TEST_LAST_NAME From b6e311e43d8a83f64c00a58fcc17fc4c52db9140 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 9 Mar 2023 11:02:58 +0000 Subject: [PATCH 29/37] =?UTF-8?q?=F0=9F=8E=A8Rename=20`external=5Fid`=20va?= =?UTF-8?q?r=20to=20`external=5Fid=5Fcolumn`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 348b97132..a3a1699ca 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -66,7 +66,7 @@ def upsert( self, df: pd.DataFrame, table: str, - external_id: str = None, + external_id_column: str = None, raise_on_error: bool = False, ) -> None: """ @@ -75,7 +75,7 @@ def upsert( Args: df (pd.DataFrame): The DataFrame to upsert. Only a single row can be upserted with this function. table (str): The table where the data should be upserted. - external_id (str, optional): The external ID to use for the upsert. Defaults to None. + external_id_column (str, optional): The external ID to use for the upsert. Defaults to None. raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails. If False, we only display a warning. Defaults to False. """ @@ -83,9 +83,9 @@ def upsert( self.logger.info("No data to upsert.") return - if external_id and external_id not in df.columns: + if external_id_column and external_id_column not in df.columns: raise ValueError( - f"Passed DataFrame does not contain column '{external_id}'." + f"Passed DataFrame does not contain column '{external_id_column}'." ) table_to_upsert = getattr(self.salesforce, table) @@ -94,11 +94,11 @@ def upsert( for record in records_cp: - if external_id: + if external_id_column: # If the specified external ID is on the upsert line and has a value, it will be used as merge_key - if record[external_id] is not None: - merge_key = f"{external_id}/{record[external_id]}" - record.pop(external_id) + if record[external_id_column] is not None: + merge_key = f"{external_id_column}/{record[external_id_column]}" + record.pop(external_id_column) else: merge_key = record.pop("Id") @@ -132,7 +132,7 @@ def bulk_upsert( self, df: pd.DataFrame, table: str, - external_id: str = None, + external_id_column: str = None, batch_size: int = 10000, raise_on_error: bool = False, ) -> None: @@ -142,7 +142,7 @@ def bulk_upsert( Args: df (pd.DataFrame): The DataFrame to upsert. table (str): The table where the data should be upserted. - external_id (str, optional): The external ID to use for the upsert. Defaults to None. + external_id_column (str, optional): The external ID to use for the upsert. Defaults to None. batch_size (int, optional): Number of records to be included in each batch of records that are sent to the Salesforce API for processing. Defaults to 10000. raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails. @@ -152,15 +152,17 @@ def bulk_upsert( self.logger.info("No data to upsert.") return - if external_id and external_id not in df.columns: + if external_id_column and external_id_column not in df.columns: raise ValueError( - f"Passed DataFrame does not contain column '{external_id}'." + f"Passed DataFrame does not contain column '{external_id_column}'." ) records = df.to_dict("records") try: response_code = self.salesforce.bulk.__getattr__(table).upsert( - data=records, external_id_field=external_id, batch_size=batch_size + data=records, + external_id_column_field=external_id_column, + batch_size=batch_size, ) except SalesforceMalformedRequest as e: # Bulk insert didn't work at all. From c8bdf4e81fad33033ce2bf5fee2f7aa48e5355cb Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 9 Mar 2023 11:04:29 +0000 Subject: [PATCH 30/37] =?UTF-8?q?=E2=9C=85=20Added=20inserting=20two=20row?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index b69acdb54..df91c347d 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -28,7 +28,7 @@ def test_row_creation(salesforce): sf.Contact.delete(result["records"][0]["Id"]) -def test_upsert_external_id_correct(salesforce, test_row_creation): +def test_upsert_external_id_column(salesforce, test_row_creation): data = { "LastName": [TEST_LAST_NAME], @@ -37,7 +37,9 @@ def test_upsert_external_id_correct(salesforce, test_row_creation): df = pd.DataFrame(data=data) try: - salesforce.upsert(df=df, table=TABLE_TO_UPSERT, external_id="SAPContactId__c") + salesforce.upsert( + df=df, table=TABLE_TO_UPSERT, external_id_column="SAPContactId__c" + ) except Exception as exception: raise exception @@ -76,14 +78,39 @@ def test_upsert_row_id(salesforce, test_row_creation): assert result["records"][0]["LastName"] == TEST_LAST_NAME -def test_upsert_external_id_wrong(salesforce, test_row_creation): +def test_upsert_non_existent_row(salesforce): + + data = { + "LastName": ["viadot-insert-1", "viadot-insert-2"], + "SAPContactId__c": ["88120", "88121"], + } + df = pd.DataFrame(data=data) + + try: + salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") + except Exception as exception: + raise exception + + sf = salesforce.salesforce + created_rows = sf.query( + f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c in ('88120','88121')" + ) + + assert created_rows["records"][0]["LastName"] == "viadot-insert-1" + assert created_rows["records"][1]["LastName"] == "viadot-insert-2" + + sf.Contact.delete(created_rows["records"][0]["Id"]) + sf.Contact.delete(created_rows["records"][1]["Id"]) + + +def test_upsert_external_id_column_wrong(salesforce, test_row_creation): data = { "LastName": [TEST_LAST_NAME], "SAPContactId__c": ["88111"], } df = pd.DataFrame(data=data) with pytest.raises(ValueError): - salesforce.upsert(df=df, table=TABLE_TO_UPSERT, external_id="SAPId") + salesforce.upsert(df=df, table=TABLE_TO_UPSERT, external_id_column="SAPId") def test_download_no_query(salesforce): From 5adfbe2b9b0de1c1cfc632b19bfc3849512b3f98 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 9 Mar 2023 11:29:11 +0000 Subject: [PATCH 31/37] =?UTF-8?q?=E2=9C=85=20Improved=20`test=5Fto=5Fdf`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index df91c347d..e1da4c3fd 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -6,6 +6,10 @@ TABLE_TO_UPSERT = "Contact" TEST_LAST_NAME = "viadot-test" +EXPECTED_VALUE = pd.DataFrame( + {"LastName": ["salesforce-test"], "SAPContactId__c": ["88111"]} +) + @pytest.fixture(scope="session") def salesforce(): @@ -124,9 +128,8 @@ def test_download_with_query(salesforce): assert len(records) > 0 -def test_to_df(salesforce): - df = salesforce.to_df(table=TABLE_TO_DOWNLOAD) - print(len(df.values)) - assert df.empty == False - assert len(df.columns) == 98 - assert len(df.values) >= 1000 +def test_to_df(salesforce, test_row_creation): + df = salesforce.to_df( + query=f"SELECT LastName, SAPContactId__c FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'" + ) + assert df.equals(EXPECTED_VALUE) From 570b5797c61f1e1ca58ec24b03d4d67b96d66706 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Thu, 9 Mar 2023 13:44:14 +0000 Subject: [PATCH 32/37] =?UTF-8?q?=E2=9C=85=F0=9F=93=9DImproved=20docstings?= =?UTF-8?q?=20in=20Salesforce=20class?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index a3a1699ca..9e035ecb3 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -70,10 +70,11 @@ def upsert( raise_on_error: bool = False, ) -> None: """ - Performs upsert operations on the selected row in the table. + Upsert the DataFrame to Salesforce. The upsert is performed on a single record at a time. + Using an upsert operation gives you more control over logging and error handling than using Bulk upsert. Args: - df (pd.DataFrame): The DataFrame to upsert. Only a single row can be upserted with this function. + df (pd.DataFrame): Dataframe containing the rows to upsert. table (str): The table where the data should be upserted. external_id_column (str, optional): The external ID to use for the upsert. Defaults to None. raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails. @@ -137,10 +138,13 @@ def bulk_upsert( raise_on_error: bool = False, ) -> None: """ - Performs upsert operations on multiple rows in a table. + Performs a Bulk upsert to Salesforce of the data given in the Dataframe. + Bulk upsert is performed on multiple records simultaneously, it is usually used when + there is a need to insert or update multiple records in a single transaction, + which can be more efficient and reduce the number of API calls required. Args: - df (pd.DataFrame): The DataFrame to upsert. + df (pd.DataFrame): Dataframe containing the rows to Bulk upsert. table (str): The table where the data should be upserted. external_id_column (str, optional): The external ID to use for the upsert. Defaults to None. batch_size (int, optional): Number of records to be included in each batch of records @@ -189,7 +193,7 @@ def download( self, query: str = None, table: str = None, columns: List[str] = None ) -> List[OrderedDict]: """ - Dowload all data from the indicated table or the result of the specified query. + Download all data from the indicated table or the result of the specified query. Args: query (str, optional): Query for download the specific data. Defaults to None. @@ -218,7 +222,7 @@ def to_df( columns: List[str] = None, ) -> pd.DataFrame: """ - Converts the List returned by the download functions to a DataFrame. + Downloads the indicated data and returns the Dataframe. Args: query (str, optional): Query for download the specific data. Defaults to None. From 4415009124ca80f99cfffd3a087d22bbb8ee6f64 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Fri, 10 Mar 2023 14:04:53 +0000 Subject: [PATCH 33/37] =?UTF-8?q?=F0=9F=90=9B=20Corrected=20variable=20nam?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 9e035ecb3..94eb80987 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -165,7 +165,7 @@ def bulk_upsert( try: response_code = self.salesforce.bulk.__getattr__(table).upsert( data=records, - external_id_column_field=external_id_column, + external_id_field=external_id_column, batch_size=batch_size, ) except SalesforceMalformedRequest as e: From 990775a64baf15b9f9fb5eb1b7d19dbc24a53dba Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Fri, 10 Mar 2023 14:14:08 +0000 Subject: [PATCH 34/37] =?UTF-8?q?=E2=9C=85=20Added=20bulk=20upsert=20test?= =?UTF-8?q?=20and=20refeactored=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 163 ++++++++++++++++++++++------------ 1 file changed, 106 insertions(+), 57 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index e1da4c3fd..09decc551 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -7,110 +7,150 @@ TEST_LAST_NAME = "viadot-test" EXPECTED_VALUE = pd.DataFrame( - {"LastName": ["salesforce-test"], "SAPContactId__c": ["88111"]} + {"LastName": ["salesforce-test"], "SAPContactId__c": ["8811111"]} ) +TEST_ROW = {"LastName": "salesforce-test", "SAPContactId__c": "8811111"} + +TWO_TEST_ROWS_INSERT = { + "LastName": ["viadot-insert-1", "viadot-insert-2"], + "SAPContactId__c": ["8812000", "8812100"], +} + +TWO_TEST_ROWS_UPDATE = { + "LastName": ["viadot-update-1", "viadot-update-2"], + "SAPContactId__c": ["8812000", "8812100"], +} + + +def get_tested_records(salesforce, multiple_rows=False): + sf = salesforce.salesforce + if multiple_rows: + result = sf.query( + f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c in ('8812000','8812100')" + ) + else: + result = sf.query( + f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='8811111'" + ) + + return result["records"] + @pytest.fixture(scope="session") def salesforce(): + s = Salesforce(config_key="salesforce_dev") - yield s + yield s -@pytest.fixture(scope="session") -def test_row_creation(salesforce): + sf = s.salesforce + result = sf.query( + f"SELECT Id FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c in ('8812000','8812100','8811111') " + ) - # Creating a test row - test_row = {"LastName": "salesforce-test", "SAPContactId__c": "88111"} - sf = salesforce.salesforce - sf.Contact.create(test_row) + # Deletes test rows from Salesforce if any remain + nr_rows = len(result["records"]) + for nr in range(nr_rows): + sf.Contact.delete(result["records"][nr]["Id"]) - yield - # Finding the Id of a created row in a table and removing it - result = sf.query(f"SELECT Id FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'") - sf.Contact.delete(result["records"][0]["Id"]) +def test_upsert_row_id(salesforce): + assert not get_tested_records(salesforce) -def test_upsert_external_id_column(salesforce, test_row_creation): + sf = salesforce.salesforce + sf.Contact.create(TEST_ROW) data = { + "Id": [get_tested_records(salesforce)[0]["Id"]], "LastName": [TEST_LAST_NAME], - "SAPContactId__c": ["88111"], } df = pd.DataFrame(data=data) try: - salesforce.upsert( - df=df, table=TABLE_TO_UPSERT, external_id_column="SAPContactId__c" - ) + salesforce.upsert(df=df, table=TABLE_TO_UPSERT) except Exception as exception: raise exception - sf = salesforce.salesforce - result = sf.query( - f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'" - ) + updated_row = get_tested_records(salesforce) - assert result["records"][0]["LastName"] == TEST_LAST_NAME + assert updated_row[0]["LastName"] == TEST_LAST_NAME + sf.Contact.delete(updated_row[0]["Id"]) -def test_upsert_row_id(salesforce, test_row_creation): - sf = salesforce.salesforce - created_row = sf.query( - f"SELECT Id FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'" - ) +def test_upsert(salesforce): - created_row_id = created_row["records"][0]["Id"] + assert not get_tested_records(salesforce, multiple_rows=True) - data = { - "Id": [created_row_id], - "LastName": [TEST_LAST_NAME], - } - df = pd.DataFrame(data=data) + df = pd.DataFrame(data=TWO_TEST_ROWS_INSERT) try: - salesforce.upsert(df=df, table=TABLE_TO_UPSERT) + salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") except Exception as exception: raise exception - result = sf.query( - f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE Id ='{created_row_id}'" - ) + inserted_rows = get_tested_records(salesforce, multiple_rows=True) + assert inserted_rows[0]["LastName"] == "viadot-insert-1" + assert inserted_rows[1]["LastName"] == "viadot-insert-2" - assert result["records"][0]["LastName"] == TEST_LAST_NAME + df = pd.DataFrame(data=TWO_TEST_ROWS_UPDATE) + try: + salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") + except Exception as exception: + raise exception -def test_upsert_non_existent_row(salesforce): + updated_rows = get_tested_records(salesforce, multiple_rows=True) + assert updated_rows[0]["LastName"] == "viadot-update-1" + assert updated_rows[1]["LastName"] == "viadot-update-2" - data = { - "LastName": ["viadot-insert-1", "viadot-insert-2"], - "SAPContactId__c": ["88120", "88121"], - } - df = pd.DataFrame(data=data) + sf = salesforce.salesforce + sf.Contact.delete(inserted_rows[0]["Id"]) + sf.Contact.delete(inserted_rows[1]["Id"]) + + +def test_bulk_upsert(salesforce): + + assert not get_tested_records(salesforce, multiple_rows=True) + + df_insert = pd.DataFrame(data=TWO_TEST_ROWS_INSERT) try: - salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") + salesforce.bulk_upsert( + df=df_insert, table="Contact", external_id_column="SAPContactId__c" + ) except Exception as exception: raise exception - sf = salesforce.salesforce - created_rows = sf.query( - f"SELECT Id, LastName FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c in ('88120','88121')" - ) + inserted_rows = get_tested_records(salesforce, multiple_rows=True) + + assert inserted_rows[0]["LastName"] == "viadot-insert-1" + assert inserted_rows[1]["LastName"] == "viadot-insert-2" - assert created_rows["records"][0]["LastName"] == "viadot-insert-1" - assert created_rows["records"][1]["LastName"] == "viadot-insert-2" + df_update = pd.DataFrame(data=TWO_TEST_ROWS_UPDATE) + + try: + salesforce.bulk_upsert( + df=df_update, table="Contact", external_id_column="SAPContactId__c" + ) + except Exception as exception: + raise exception - sf.Contact.delete(created_rows["records"][0]["Id"]) - sf.Contact.delete(created_rows["records"][1]["Id"]) + updated_rows = get_tested_records(salesforce, multiple_rows=True) + assert updated_rows[0]["LastName"] == "viadot-update-1" + assert updated_rows[1]["LastName"] == "viadot-update-2" + sf = salesforce.salesforce + sf.Contact.delete(updated_rows[0]["Id"]) + sf.Contact.delete(updated_rows[1]["Id"]) -def test_upsert_external_id_column_wrong(salesforce, test_row_creation): + +def test_upsert_external_id_column_wrong(salesforce): data = { "LastName": [TEST_LAST_NAME], - "SAPContactId__c": ["88111"], + "SAPContactId__c": ["8811111"], } df = pd.DataFrame(data=data) with pytest.raises(ValueError): @@ -128,8 +168,17 @@ def test_download_with_query(salesforce): assert len(records) > 0 -def test_to_df(salesforce, test_row_creation): +def test_to_df(salesforce): + + assert not get_tested_records(salesforce) + + sf = salesforce.salesforce + sf.Contact.create(TEST_ROW) + df = salesforce.to_df( - query=f"SELECT LastName, SAPContactId__c FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='88111'" + query=f"SELECT LastName, SAPContactId__c FROM {TABLE_TO_UPSERT} WHERE SAPContactId__c='8811111'" ) + assert df.equals(EXPECTED_VALUE) + + sf.Contact.delete(get_tested_records(salesforce)[0]["Id"]) From 03d07c4da6508d2b436bf0a4bb09e4f7c3a3db54 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Mon, 20 Mar 2023 09:45:28 +0100 Subject: [PATCH 35/37] =?UTF-8?q?=F0=9F=93=9D=20Updated=20docstrings=20in?= =?UTF-8?q?=20Salesforce=20source?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/sources/salesforce.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/viadot/sources/salesforce.py b/viadot/sources/salesforce.py index 94eb80987..7348187e0 100644 --- a/viadot/sources/salesforce.py +++ b/viadot/sources/salesforce.py @@ -14,8 +14,8 @@ class Salesforce(Source): Args: domain (str, optional): Domain of a connection. Defaults to 'test' (sandbox). - Can be added only if built-in username/password/security token is provided. - client_id (str, optional): Client id to keep the track of API calls. + Can only be added if a username/password/security token is provided. + client_id (str, optional): Client id, keep track of API calls. Defaults to 'viadot'. env (Literal["DEV", "QA", "PROD"], optional): Environment information, provides information about credential and connection configuration. Defaults to 'DEV'. @@ -71,10 +71,10 @@ def upsert( ) -> None: """ Upsert the DataFrame to Salesforce. The upsert is performed on a single record at a time. - Using an upsert operation gives you more control over logging and error handling than using Bulk upsert. + Using an upsert operation gives you more control over logging and error handling than using bulk upsert. Args: - df (pd.DataFrame): Dataframe containing the rows to upsert. + df (pd.DataFrame): Pandas DataFrame specified the rows to upsert. table (str): The table where the data should be upserted. external_id_column (str, optional): The external ID to use for the upsert. Defaults to None. raise_on_error (bool, optional): Whether to raise an exception if a row upsert fails. @@ -138,13 +138,13 @@ def bulk_upsert( raise_on_error: bool = False, ) -> None: """ - Performs a Bulk upsert to Salesforce of the data given in the Dataframe. + Performs a bulk upsert to Salesforce of the data given in the DataFrame. Bulk upsert is performed on multiple records simultaneously, it is usually used when there is a need to insert or update multiple records in a single transaction, which can be more efficient and reduce the number of API calls required. Args: - df (pd.DataFrame): Dataframe containing the rows to Bulk upsert. + df (pd.DataFrame): Pandas DataFrame specified the rows to bulk upsert. table (str): The table where the data should be upserted. external_id_column (str, optional): The external ID to use for the upsert. Defaults to None. batch_size (int, optional): Number of records to be included in each batch of records @@ -169,7 +169,7 @@ def bulk_upsert( batch_size=batch_size, ) except SalesforceMalformedRequest as e: - # Bulk insert didn't work at all. + # Bulk upsert didn't work at all. raise ValueError(f"Upsert of records failed: {e}") from e self.logger.info(f"Successfully upserted bulk records.") @@ -196,10 +196,10 @@ def download( Download all data from the indicated table or the result of the specified query. Args: - query (str, optional): Query for download the specific data. Defaults to None. + query (str, optional): The query to be used to download the data. Defaults to None. table (str, optional): Table name. Defaults to None. - columns (List[str], optional): List of columns which are needed, - requires table argument. Defaults to None. + columns (List[str], optional): List of required columns. Requires `table` to be specified. + Defaults to None. Returns: List[OrderedDict]: Selected rows from Salesforce. @@ -222,13 +222,13 @@ def to_df( columns: List[str] = None, ) -> pd.DataFrame: """ - Downloads the indicated data and returns the Dataframe. + Downloads the indicated data and returns the DataFrame. Args: - query (str, optional): Query for download the specific data. Defaults to None. + query (str, optional): The query to be used to download the data. Defaults to None. table (str, optional): Table name. Defaults to None. - columns (List[str], optional): List of columns which are needed, - requires table argument. Defaults to None. + columns (List[str], optional): List of required columns. Requires `table` to be specified. + Defaults to None. Returns: pd.DataFrame: Selected rows from Salesforce. From 7885ba7946199a4958028b888c836b6082028231 Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Tue, 21 Mar 2023 15:15:55 +0100 Subject: [PATCH 36/37] =?UTF-8?q?=E2=9C=85=20Removed=20try=20except=20from?= =?UTF-8?q?=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 09decc551..399e79f51 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -68,10 +68,7 @@ def test_upsert_row_id(salesforce): } df = pd.DataFrame(data=data) - try: - salesforce.upsert(df=df, table=TABLE_TO_UPSERT) - except Exception as exception: - raise exception + salesforce.upsert(df=df, table=TABLE_TO_UPSERT) updated_row = get_tested_records(salesforce) @@ -86,10 +83,7 @@ def test_upsert(salesforce): df = pd.DataFrame(data=TWO_TEST_ROWS_INSERT) - try: - salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") - except Exception as exception: - raise exception + salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") inserted_rows = get_tested_records(salesforce, multiple_rows=True) assert inserted_rows[0]["LastName"] == "viadot-insert-1" @@ -97,10 +91,7 @@ def test_upsert(salesforce): df = pd.DataFrame(data=TWO_TEST_ROWS_UPDATE) - try: - salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") - except Exception as exception: - raise exception + salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") updated_rows = get_tested_records(salesforce, multiple_rows=True) assert updated_rows[0]["LastName"] == "viadot-update-1" @@ -117,12 +108,9 @@ def test_bulk_upsert(salesforce): df_insert = pd.DataFrame(data=TWO_TEST_ROWS_INSERT) - try: - salesforce.bulk_upsert( - df=df_insert, table="Contact", external_id_column="SAPContactId__c" - ) - except Exception as exception: - raise exception + salesforce.bulk_upsert( + df=df_insert, table="Contact", external_id_column="SAPContactId__c" + ) inserted_rows = get_tested_records(salesforce, multiple_rows=True) @@ -131,12 +119,9 @@ def test_bulk_upsert(salesforce): df_update = pd.DataFrame(data=TWO_TEST_ROWS_UPDATE) - try: - salesforce.bulk_upsert( - df=df_update, table="Contact", external_id_column="SAPContactId__c" - ) - except Exception as exception: - raise exception + salesforce.bulk_upsert( + df=df_update, table="Contact", external_id_column="SAPContactId__c" + ) updated_rows = get_tested_records(salesforce, multiple_rows=True) assert updated_rows[0]["LastName"] == "viadot-update-1" From ea0fa2a9f12146b569af6519a01069b66faf498d Mon Sep 17 00:00:00 2001 From: djagoda881 Date: Tue, 21 Mar 2023 15:37:26 +0100 Subject: [PATCH 37/37] =?UTF-8?q?=E2=9C=85=20Renamed=20test=20and=20fixed?= =?UTF-8?q?=20typos?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_salesforce.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/unit/test_salesforce.py b/tests/unit/test_salesforce.py index 399e79f51..22bb8c662 100644 --- a/tests/unit/test_salesforce.py +++ b/tests/unit/test_salesforce.py @@ -81,17 +81,22 @@ def test_upsert(salesforce): assert not get_tested_records(salesforce, multiple_rows=True) - df = pd.DataFrame(data=TWO_TEST_ROWS_INSERT) + df_insert = pd.DataFrame(data=TWO_TEST_ROWS_INSERT) - salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") + salesforce.upsert( + df=df_insert, table="Contact", external_id_column="SAPContactId__c" + ) inserted_rows = get_tested_records(salesforce, multiple_rows=True) + assert inserted_rows[0]["LastName"] == "viadot-insert-1" assert inserted_rows[1]["LastName"] == "viadot-insert-2" - df = pd.DataFrame(data=TWO_TEST_ROWS_UPDATE) + df_update = pd.DataFrame(data=TWO_TEST_ROWS_UPDATE) - salesforce.upsert(df=df, table="Contact", external_id_column="SAPContactId__c") + salesforce.upsert( + df=df_update, table="Contact", external_id_column="SAPContactId__c" + ) updated_rows = get_tested_records(salesforce, multiple_rows=True) assert updated_rows[0]["LastName"] == "viadot-update-1" @@ -132,7 +137,7 @@ def test_bulk_upsert(salesforce): sf.Contact.delete(updated_rows[1]["Id"]) -def test_upsert_external_id_column_wrong(salesforce): +def test_upsert_incorrect_external_id_column(salesforce): data = { "LastName": [TEST_LAST_NAME], "SAPContactId__c": ["8811111"], @@ -148,9 +153,9 @@ def test_download_no_query(salesforce): def test_download_with_query(salesforce): - query = f"SELECT Id, Name FROM {TABLE_TO_DOWNLOAD}" + query = f"SELECT Id, Name FROM {TABLE_TO_DOWNLOAD} LIMIT 10" records = salesforce.download(query=query) - assert len(records) > 0 + assert len(records) == 10 def test_to_df(salesforce):