Skip to content

Commit

Permalink
feat: adding support for future oauth facility in salesforce (#2746)
Browse files Browse the repository at this point in the history
* feat: oauth in salesforce

* fix: backward compatibility initial

* fix: editing error class and proxy test case

* fix: access token call for legacy auth

* Update src/v0/destinations/salesforce/utils.js

Co-authored-by: Sudip Paul <[email protected]>

* feat: added missing authorization logic for axios calls

* fix: adding new dest salesforce oauth

* fix: review comment addressed

* feat: review comments addressed

* feat: small edit

* fix: review comments addressed

* Update src/v0/destinations/salesforce/transform.js

Co-authored-by: Sankeerth <[email protected]>

* fix: review comments addressed

* fix: review comments addressed

---------

Co-authored-by: Sudip Paul <[email protected]>
Co-authored-by: Sankeerth <[email protected]>
  • Loading branch information
3 people authored Nov 13, 2023
1 parent e82cef8 commit 916ea4c
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 29 deletions.
1 change: 1 addition & 0 deletions src/constants/destinationCanonicalNames.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const DestHandlerMap = {
ga360: 'ga',
salesforce_oauth: 'salesforce',
};

const DestCanonicalNames = {
Expand Down
1 change: 1 addition & 0 deletions src/features.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"PINTEREST_TAG": true,
"PROFITWELL": true,
"SALESFORCE": true,
"SALESFORCE_OAUTH": true,
"SFMC": true,
"SNAPCHAT_CONVERSION": true,
"TIKTOK_ADS": true,
Expand Down
4 changes: 4 additions & 0 deletions src/v0/destinations/salesforce/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const SF_TOKEN_REQUEST_URL = 'https://login.salesforce.com/services/oauth2/token
const SF_TOKEN_REQUEST_URL_SANDBOX = 'https://test.salesforce.com/services/oauth2/token';

const DESTINATION = 'Salesforce';
const OAUTH = 'oauth';
const LEGACY = 'legacy';

const mappingConfig = getMappingConfig(ConfigCategory, __dirname);

Expand All @@ -37,4 +39,6 @@ module.exports = {
ignoredContactTraits: mappingConfig[ConfigCategory.IGNORE_CONTACT.name],
ACCESS_TOKEN_CACHE_TTL,
DESTINATION,
OAUTH,
LEGACY,
};
74 changes: 52 additions & 22 deletions src/v0/destinations/salesforce/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const {
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
generateErrorObject,
isHttpStatusSuccess,
} = require('../../util');
const { getAccessToken, salesforceResponseHandler } = require('./utils');
const { salesforceResponseHandler, collectAuthorizationInfo, getAuthHeader } = require('./utils');
const { handleHttpRequest } = require('../../../adapters/network');
const { JSON_MIME_TYPE } = require('../../util/constant');

Expand All @@ -41,6 +42,7 @@ function responseBuilderSimple(
authorizationData,
mapProperty,
mappedToDestination,
authorizationFlow,
) {
const { salesforceType, salesforceId } = salesforceMap;

Expand Down Expand Up @@ -87,12 +89,12 @@ function responseBuilderSimple(
}

const response = defaultRequestConfig();
const header = {

response.method = defaultPostRequestConfig.requestMethod;
response.headers = {
'Content-Type': JSON_MIME_TYPE,
Authorization: authorizationData.token,
...getAuthHeader({ authorizationFlow, authorizationData }),
};
response.method = defaultPostRequestConfig.requestMethod;
response.headers = header;
response.body.JSON = removeUndefinedValues(rawPayload);
response.endpoint = targetEndpoint;

Expand All @@ -106,24 +108,26 @@ async function getSaleforceIdForRecord(
identifierType,
identifierValue,
destination,
authorizationFlow,
) {
const objSearchUrl = `${authorizationData.instanceUrl}/services/data/v${SF_API_VERSION}/parameterizedSearch/?q=${identifierValue}&sobject=${objectType}&in=${identifierType}&${objectType}.fields=id,${identifierType}`;
const { processedResponse: processedsfSearchResponse } = await handleHttpRequest(
'get',
objSearchUrl,
{
headers: { Authorization: authorizationData.token },
headers: getAuthHeader({ authorizationFlow, authorizationData }),
},
{
destType: 'salesforce',
feature: 'transformation',
},
);
if (processedsfSearchResponse.status !== 200) {
if (!isHttpStatusSuccess(processedsfSearchResponse.status)) {
salesforceResponseHandler(
processedsfSearchResponse,
`:- SALESFORCE SEARCH BY ID`,
destination.ID,
authorizationFlow,
);
}
const searchRecord = processedsfSearchResponse.response?.searchRecords?.find(
Expand All @@ -149,7 +153,12 @@ async function getSaleforceIdForRecord(
// We'll use the Salesforce Object names by removing "Salesforce-" string from the type field
//
// Default Object type will be "Lead" for backward compatibility
async function getSalesforceIdFromPayload(message, authorizationData, destination) {
async function getSalesforceIdFromPayload(
message,
authorizationData,
destination,
authorizationFlow,
) {
// define default map
const salesforceMaps = [];

Expand Down Expand Up @@ -191,6 +200,7 @@ async function getSalesforceIdFromPayload(message, authorizationData, destinatio
identifierType,
id,
destination,
authorizationFlow,
);
}

Expand All @@ -212,21 +222,27 @@ async function getSalesforceIdFromPayload(message, authorizationData, destinatio
throw new InstrumentationError('Invalid Email address for Lead Objet');
}
const leadQueryUrl = `${authorizationData.instanceUrl}/services/data/v${SF_API_VERSION}/parameterizedSearch/?q=${email}&sobject=Lead&Lead.fields=id,IsConverted,ConvertedContactId,IsDeleted`;

// request configuration will be conditional
const { processedResponse: processedLeadQueryResponse } = await handleHttpRequest(
'get',
leadQueryUrl,
{
headers: { Authorization: authorizationData.token },
headers: getAuthHeader({ authorizationFlow, authorizationData }),
},
{
destType: 'salesforce',
feature: 'transformation',
},
);

if (processedLeadQueryResponse.status !== 200) {
salesforceResponseHandler(processedLeadQueryResponse, `:- during Lead Query`, destination.ID);
if (!isHttpStatusSuccess(processedLeadQueryResponse.status)) {
salesforceResponseHandler(
processedLeadQueryResponse,
`:- during Lead Query`,
destination.ID,
authorizationFlow,
);
}

if (processedLeadQueryResponse.response.searchRecords.length > 0) {
Expand Down Expand Up @@ -262,7 +278,7 @@ async function getSalesforceIdFromPayload(message, authorizationData, destinatio
}

// Function for handling identify events
async function processIdentify(message, authorizationData, destination) {
async function processIdentify(message, authorizationData, destination, authorizationFlow) {
const mapProperty =
destination.Config.mapProperty === undefined ? true : destination.Config.mapProperty;
// check the traits before hand
Expand All @@ -283,7 +299,12 @@ async function processIdentify(message, authorizationData, destination) {
const responseData = [];

// get salesforce object map
const salesforceMaps = await getSalesforceIdFromPayload(message, authorizationData, destination);
const salesforceMaps = await getSalesforceIdFromPayload(
message,
authorizationData,
destination,
authorizationFlow,
);

// iterate over the object types found
salesforceMaps.forEach((salesforceMap) => {
Expand All @@ -295,6 +316,7 @@ async function processIdentify(message, authorizationData, destination) {
authorizationData,
mapProperty,
mappedToDestination,
authorizationFlow,
),
);
});
Expand All @@ -304,20 +326,24 @@ async function processIdentify(message, authorizationData, destination) {

// Generic process function which invokes specific handler functions depending on message type
// and event type where applicable
async function processSingleMessage(message, authorizationData, destination) {
async function processSingleMessage(message, authorizationData, destination, authorizationFlow) {
let response;
if (message.type === EventType.IDENTIFY) {
response = await processIdentify(message, authorizationData, destination);
response = await processIdentify(message, authorizationData, destination, authorizationFlow);
} else {
throw new InstrumentationError(`message type ${message.type} is not supported`);
}
return response;
}

async function process(event) {
// Get the authorization header if not available
const authorizationData = await getAccessToken(event.destination);
const response = await processSingleMessage(event.message, authorizationData, event.destination);
const authInfo = await collectAuthorizationInfo(event);
const response = await processSingleMessage(
event.message,
authInfo.authorizationData,
event.destination,
authInfo.authorizationFlow,
);
return response;
}

Expand All @@ -326,10 +352,9 @@ const processRouterDest = async (inputs, reqMetadata) => {
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

let authorizationData;
let authInfo;
try {
authorizationData = await getAccessToken(inputs[0].destination);
authInfo = await collectAuthorizationInfo(inputs[0]);
} catch (error) {
const errObj = generateErrorObject(error);
const respEvents = getErrorRespEvents(
Expand All @@ -351,7 +376,12 @@ const processRouterDest = async (inputs, reqMetadata) => {

// unprocessed payload
return getSuccessRespEvents(
await processSingleMessage(input.message, authorizationData, input.destination),
await processSingleMessage(
input.message,
authInfo.authorizationData,
input.destination,
authInfo.authorizationFlow,
),
[input.metadata],
input.destination,
);
Expand Down
54 changes: 48 additions & 6 deletions src/v0/destinations/salesforce/utils.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
const { RetryableError, ThrottledError, AbortedError } = require('@rudderstack/integrations-lib');
const { handleHttpRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess } = require('../../util');
const {
isHttpStatusSuccess,
getAuthErrCategoryFromStCode,
isDefinedAndNotNull,
} = require('../../util');
const Cache = require('../../util/cache');
const {
ACCESS_TOKEN_CACHE_TTL,
SF_TOKEN_REQUEST_URL_SANDBOX,
SF_TOKEN_REQUEST_URL,
DESTINATION,
LEGACY,
OAUTH,
} = require('./config');

const ACCESS_TOKEN_CACHE = new Cache(ACCESS_TOKEN_CACHE_TTL);
Expand All @@ -19,21 +25,24 @@ const ACCESS_TOKEN_CACHE = new Cache(ACCESS_TOKEN_CACHE_TTL);
* @param {*} stage
* @param {String} authKey
*/
const salesforceResponseHandler = (destResponse, sourceMessage, authKey) => {
const salesforceResponseHandler = (destResponse, sourceMessage, authKey, authorizationFlow) => {
const { status, response } = destResponse;

// if the response from destination is not a success case build an explicit error
if (!isHttpStatusSuccess(status) && status >= 400) {
const matchErrorCode = (errorCode) =>
response && Array.isArray(response) && response.some((resp) => resp?.errorCode === errorCode);
if (status === 401 && authKey && matchErrorCode('INVALID_SESSION_ID')) {
// checking for invalid/expired token errors and evicting cache in that case
// rudderJobMetadata contains some destination info which is being used to evict the cache
ACCESS_TOKEN_CACHE.del(authKey);
if (authorizationFlow === LEGACY) {
// checking for invalid/expired token errors and evicting cache in that case
// rudderJobMetadata contains some destination info which is being used to evict the cache
ACCESS_TOKEN_CACHE.del(authKey);
}
throw new RetryableError(
`${DESTINATION} Request Failed - due to "INVALID_SESSION_ID", (Retryable) ${sourceMessage}`,
500,
destResponse,
authorizationFlow === LEGACY ? '' : getAuthErrCategoryFromStCode(status),
);
} else if (status === 403 && matchErrorCode('REQUEST_LIMIT_EXCEEDED')) {
// If the error code is REQUEST_LIMIT_EXCEEDED, you’ve exceeded API request limits in your org.
Expand Down Expand Up @@ -89,6 +98,11 @@ const salesforceResponseHandler = (destResponse, sourceMessage, authKey) => {
* @param {*} destination
* @returns
*/
const getAccessTokenOauth = (metadata) => ({
token: metadata.secret?.access_token,
instanceUrl: metadata.secret?.instance_url,
});

const getAccessToken = async (destination) => {
const accessTokenKey = destination.ID;

Expand Down Expand Up @@ -122,6 +136,7 @@ const getAccessToken = async (destination) => {
processedResponse,
`:- authentication failed during fetching access token.`,
accessTokenKey,
LEGACY,
);
}
const token = httpResponse.response.data;
Expand All @@ -131,6 +146,7 @@ const getAccessToken = async (destination) => {
processedResponse,
`:- authentication failed could not retrieve authorization token.`,
accessTokenKey,
LEGACY,
);
}
return {
Expand All @@ -140,4 +156,30 @@ const getAccessToken = async (destination) => {
});
};

module.exports = { getAccessToken, salesforceResponseHandler };
const collectAuthorizationInfo = async (event) => {
let authorizationFlow;
let authorizationData;
if (isDefinedAndNotNull(event.metadata?.secret)) {
authorizationFlow = OAUTH;
authorizationData = getAccessTokenOauth(event.metadata);
} else {
authorizationFlow = LEGACY;
authorizationData = await getAccessToken(event.destination);
}
return { authorizationFlow, authorizationData };
};

const getAuthHeader = (authInfo) => {
const { authorizationFlow, authorizationData } = authInfo;
return authorizationFlow === OAUTH
? { Authorization: `Bearer ${authorizationData.token}` }
: { Authorization: authorizationData.token };
};

module.exports = {
getAccessTokenOauth,
salesforceResponseHandler,
getAccessToken,
collectAuthorizationInfo,
getAuthHeader,
};
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export const data = [
body: {
output: {
status: 500,
authErrorCategory: 'REFRESH_TOKEN',
message:
'Salesforce Request Failed - due to "INVALID_SESSION_ID", (Retryable) during Salesforce Response Handling',
destinationResponse: {
Expand All @@ -128,12 +129,12 @@ export const data = [
errorCode: 'INVALID_SESSION_ID',
},
],
status: 401,
rudderJobMetadata: {
destInfo: {
authKey: '2HezPl1w11opbFSxnLDEgZ7kWTf',
},
},
status: 401,
},
statTags: {
destType: 'SALESFORCE',
Expand Down
Loading

0 comments on commit 916ea4c

Please sign in to comment.