From e6630d410ccf9af113e24b4c8379d91735b3201a Mon Sep 17 00:00:00 2001 From: Sankeerth Date: Thu, 20 Jun 2024 12:15:54 +0530 Subject: [PATCH] chore: enable labels for api calls - 3 (#3483) * chore: enable labels for api calls - 3 * chore: propagate metadata for user destination * chore: propagate metadata in braze * chore: propagate metadata for api calls in hubspot * chore: propagate metadata for labels in api calls for - pardot - rakuten - snapchat_custom_audience - the_trade_desk - yahoo_dsp * chore: propagate metadata in zendesk Signed-off-by: Sai Sankeerth * chore: propagate metadata in gainsight --------- Signed-off-by: Sai Sankeerth Co-authored-by: Sai Sankeerth --- src/v0/destinations/braze/braze.util.test.js | 10 +- src/v0/destinations/braze/transform.js | 5 +- src/v0/destinations/braze/util.js | 7 +- src/v0/destinations/gainsight/transform.js | 12 +-- src/v0/destinations/gainsight/util.js | 9 +- src/v0/destinations/hs/HSTransform-v1.js | 13 ++- src/v0/destinations/hs/HSTransform-v2.js | 10 +- src/v0/destinations/hs/transform.js | 31 ++++--- src/v0/destinations/hs/util.js | 25 +++-- src/v0/destinations/marketo/transform.js | 54 +++++++---- src/v0/destinations/marketo/util.js | 6 +- .../marketo_static_list/transform.js | 6 +- src/v0/destinations/pardot/networkHandler.js | 2 + src/v0/destinations/rakuten/networkHandler.js | 2 + .../networkHandler.js | 2 + .../the_trade_desk/networkHandler.js | 2 + src/v0/destinations/user/transform.js | 40 ++++---- src/v0/destinations/user/utils.js | 35 ++++--- src/v0/destinations/yahoo_dsp/transform.js | 10 +- src/v0/destinations/yahoo_dsp/util.js | 3 +- src/v0/destinations/zendesk/transform.js | 92 ++++++++++++++----- 21 files changed, 241 insertions(+), 135 deletions(-) diff --git a/src/v0/destinations/braze/braze.util.test.js b/src/v0/destinations/braze/braze.util.test.js index 460f1db565..f2726c3283 100644 --- a/src/v0/destinations/braze/braze.util.test.js +++ b/src/v0/destinations/braze/braze.util.test.js @@ -261,7 +261,7 @@ describe('dedup utility tests', () => { }; // Call the function - const users = await BrazeDedupUtility.doApiLookup(identfierChunks, destination); + const users = await BrazeDedupUtility.doApiLookup(identfierChunks, { destination }); // Check the result expect(users).toEqual([ @@ -399,7 +399,9 @@ describe('dedup utility tests', () => { }, })); - const chunkedUserData = await BrazeDedupUtility.doApiLookup(identifierChunks, destination); + const chunkedUserData = await BrazeDedupUtility.doApiLookup(identifierChunks, { + destination, + }); const result = _.flatMap(chunkedUserData); expect(result).toHaveLength(110); expect(handleHttpRequest).toHaveBeenCalledTimes(3); @@ -455,7 +457,7 @@ describe('dedup utility tests', () => { }, })); - const users = await BrazeDedupUtility.doApiLookup(chunks, destination); + const users = await BrazeDedupUtility.doApiLookup(chunks, { destination }); expect(handleHttpRequest).toHaveBeenCalledTimes(2); // Assert that the first chunk was successful and the second failed @@ -522,7 +524,7 @@ describe('dedup utility tests', () => { [{ alias_name: 'alias1', alias_label: 'rudder_id' }], [{ alias_name: 'alias2', alias_label: 'rudder_id' }], ], - { Config: { restApiKey: 'xyz' } }, + { destination: { Config: { restApiKey: 'xyz' } } }, ); // restore the original implementation of the mocked functions diff --git a/src/v0/destinations/braze/transform.js b/src/v0/destinations/braze/transform.js index 3d6a99d424..155f32c145 100644 --- a/src/v0/destinations/braze/transform.js +++ b/src/v0/destinations/braze/transform.js @@ -203,7 +203,7 @@ function getUserAttributesObject(message, mappingJson, destination) { * @param {*} message * @param {*} destination */ -async function processIdentify(message, destination) { +async function processIdentify({ message, destination, metadata }) { const identifyPayload = getIdentifyPayload(message); const identifyEndpoint = getIdentifyEndpoint(getEndpointFromConfig(destination)); const { processedResponse: brazeIdentifyResp } = await handleHttpRequest( @@ -223,6 +223,7 @@ async function processIdentify(message, destination) { requestMethod: 'POST', module: 'router', endpointPath: '/users/identify', + metadata, }, ); @@ -517,7 +518,7 @@ async function process(event, processParams = { userStore: new Map() }, reqMetad const brazeExternalID = getDestinationExternalID(message, 'brazeExternalId') || message.userId; if (message.anonymousId && brazeExternalID) { - await processIdentify(message, destination); + await processIdentify(event); } else { collectStatsForAliasMissConfigurations(destination.ID); } diff --git a/src/v0/destinations/braze/util.js b/src/v0/destinations/braze/util.js index b3b29cdf96..63f7836088 100644 --- a/src/v0/destinations/braze/util.js +++ b/src/v0/destinations/braze/util.js @@ -142,7 +142,7 @@ const BrazeDedupUtility = { return identfierChunks; }, - async doApiLookup(identfierChunks, destination) { + async doApiLookup(identfierChunks, { destination, metadata }) { return Promise.all( identfierChunks.map(async (ids) => { const externalIdentifiers = ids.filter((id) => id.external_id); @@ -167,6 +167,7 @@ const BrazeDedupUtility = { requestMethod: 'POST', module: 'router', endpointPath: '/users/export/ids', + metadata, }, ); stats.counter('braze_lookup_failure_count', 1, { @@ -189,10 +190,10 @@ const BrazeDedupUtility = { */ async doLookup(inputs) { const lookupStartTime = new Date(); - const { destination } = inputs[0]; + const { destination, metadata } = inputs[0]; const { externalIdsToQuery, aliasIdsToQuery } = this.prepareInputForDedup(inputs); const identfierChunks = this.prepareChunksForDedup(externalIdsToQuery, aliasIdsToQuery); - const chunkedUserData = await this.doApiLookup(identfierChunks, destination); + const chunkedUserData = await this.doApiLookup(identfierChunks, { destination, metadata }); stats.timing('braze_lookup_time', lookupStartTime, { destination_id: destination.Config.destinationId, }); diff --git a/src/v0/destinations/gainsight/transform.js b/src/v0/destinations/gainsight/transform.js index f47296f066..815ef040b9 100644 --- a/src/v0/destinations/gainsight/transform.js +++ b/src/v0/destinations/gainsight/transform.js @@ -79,7 +79,7 @@ const identifyResponseBuilder = (message, { Config }) => { * Person Object. * https://support.gainsight.com/Gainsight_NXT/API_and_Developer_Docs/Company_API/Company_API_Documentation */ -const groupResponseBuilder = async (message, { Config }) => { +const groupResponseBuilder = async (message, { Config }, metadata) => { const { accessKey } = getConfigOrThrowError(Config, ['accessKey'], 'group'); const groupName = getValueFromMessage(message, 'traits.name'); if (!groupName) { @@ -91,7 +91,7 @@ const groupResponseBuilder = async (message, { Config }) => { throw new InstrumentationError('user email is required for group'); } - const resp = await searchGroup(groupName, Config); + const resp = await searchGroup(groupName, Config, metadata); let payload = constructPayload(message, groupMapping); const defaultKeys = Object.keys(payload); @@ -104,9 +104,9 @@ const groupResponseBuilder = async (message, { Config }) => { let groupGsid; if (resp.data.data.records.length === 0) { - groupGsid = await createGroup(payload, Config); + groupGsid = await createGroup(payload, Config, metadata); } else { - groupGsid = await updateGroup(payload, Config); + groupGsid = await updateGroup(payload, Config, metadata); } const responsePayload = { @@ -184,7 +184,7 @@ const trackResponseBuilder = (message, { Config }) => { * Processing Single event */ const process = async (event) => { - const { message, destination } = event; + const { message, destination, metadata } = event; if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } @@ -196,7 +196,7 @@ const process = async (event) => { response = identifyResponseBuilder(message, destination); break; case EventType.GROUP: - response = await groupResponseBuilder(message, destination); + response = await groupResponseBuilder(message, destination, metadata); break; case EventType.TRACK: response = trackResponseBuilder(message, destination); diff --git a/src/v0/destinations/gainsight/util.js b/src/v0/destinations/gainsight/util.js index 4c7fd58193..afd1266b21 100644 --- a/src/v0/destinations/gainsight/util.js +++ b/src/v0/destinations/gainsight/util.js @@ -10,7 +10,7 @@ const { ENDPOINTS, getLookupPayload } = require('./config'); const tags = require('../../util/tags'); const { JSON_MIME_TYPE } = require('../../util/constant'); -const searchGroup = async (groupName, Config) => { +const searchGroup = async (groupName, Config, metadata) => { let resp; try { resp = await myAxios.post( @@ -28,6 +28,7 @@ const searchGroup = async (groupName, Config) => { requestMethod: 'POST', endpointPath: '/data/objects/query/Company', module: 'router', + metadata, }, ); } catch (error) { @@ -48,7 +49,7 @@ const searchGroup = async (groupName, Config) => { return resp; }; -const createGroup = async (payload, Config) => { +const createGroup = async (payload, Config, metadata) => { let resp; try { resp = await myAxios.post( @@ -68,6 +69,7 @@ const createGroup = async (payload, Config) => { requestMethod: 'POST', endpointPath: '/data/objects/Company', module: 'router', + metadata, }, ); } catch (error) { @@ -88,7 +90,7 @@ const createGroup = async (payload, Config) => { return resp.data.data.records[0].Gsid; }; -const updateGroup = async (payload, Config) => { +const updateGroup = async (payload, Config, metadata) => { let resp; try { resp = await myAxios.put( @@ -111,6 +113,7 @@ const updateGroup = async (payload, Config) => { requestMethod: 'PUT', endpointPath: '/data/objects/Company', module: 'router', + metadata, }, ); } catch (error) { diff --git a/src/v0/destinations/hs/HSTransform-v1.js b/src/v0/destinations/hs/HSTransform-v1.js index 51feebea74..ed94bd7c17 100644 --- a/src/v0/destinations/hs/HSTransform-v1.js +++ b/src/v0/destinations/hs/HSTransform-v1.js @@ -51,7 +51,7 @@ const { JSON_MIME_TYPE } = require('../../util/constant'); * @param {*} propertyMap * @returns */ -const processLegacyIdentify = async (message, destination, propertyMap) => { +const processLegacyIdentify = async ({ message, destination, metadata }, propertyMap) => { const { Config } = destination; let traits = getFieldValueFromMessage(message, 'traits'); const mappedToDestination = get(message, MappedToDestinationKey); @@ -82,7 +82,7 @@ const processLegacyIdentify = async (message, destination, propertyMap) => { response.method = defaultPatchRequestConfig.requestMethod; } - traits = await populateTraits(propertyMap, traits, destination); + traits = await populateTraits(propertyMap, traits, destination, metadata); response.body.JSON = removeUndefinedAndNullValues({ properties: traits }); response.source = 'rETL'; response.operation = operation; @@ -92,7 +92,10 @@ const processLegacyIdentify = async (message, destination, propertyMap) => { } const { email } = traits; - const userProperties = await getTransformedJSON(message, destination, propertyMap); + const userProperties = await getTransformedJSON( + { message, destination, metadata }, + propertyMap, + ); const payload = { properties: formatPropertyValueForIdentify(userProperties), @@ -134,7 +137,7 @@ const processLegacyIdentify = async (message, destination, propertyMap) => { * @param {*} propertyMap * @returns */ -const processLegacyTrack = async (message, destination, propertyMap) => { +const processLegacyTrack = async ({ message, destination, metadata }, propertyMap) => { const { Config } = destination; if (!Config.hubID) { @@ -151,7 +154,7 @@ const processLegacyTrack = async (message, destination, propertyMap) => { id: getDestinationExternalID(message, 'hubspotId'), }; - const userProperties = await getTransformedJSON(message, destination, propertyMap); + const userProperties = await getTransformedJSON({ message, destination, metadata }, propertyMap); const payload = { ...parameters, ...userProperties }; const params = removeUndefinedAndNullValues(payload); diff --git a/src/v0/destinations/hs/HSTransform-v2.js b/src/v0/destinations/hs/HSTransform-v2.js index d2b26f1ab8..175830622b 100644 --- a/src/v0/destinations/hs/HSTransform-v2.js +++ b/src/v0/destinations/hs/HSTransform-v2.js @@ -68,7 +68,7 @@ const addHsAuthentication = (response, Config) => { * @param {*} propertyMap * @returns */ -const processIdentify = async (message, destination, propertyMap) => { +const processIdentify = async ({ message, destination, metadata }, propertyMap) => { const { Config } = destination; let traits = getFieldValueFromMessage(message, 'traits'); const mappedToDestination = get(message, MappedToDestinationKey); @@ -125,7 +125,7 @@ const processIdentify = async (message, destination, propertyMap) => { response.method = defaultPatchRequestConfig.requestMethod; } - traits = await populateTraits(propertyMap, traits, destination); + traits = await populateTraits(propertyMap, traits, destination, metadata); response.body.JSON = removeUndefinedAndNullValues({ properties: traits }); response.source = 'rETL'; response.operation = operation; @@ -138,10 +138,10 @@ const processIdentify = async (message, destination, propertyMap) => { // if contactId is not provided then search if (!contactId) { - contactId = await searchContacts(message, destination); + contactId = await searchContacts(message, destination, metadata); } - const properties = await getTransformedJSON(message, destination, propertyMap); + const properties = await getTransformedJSON({ message, destination, metadata }, propertyMap); const payload = { properties, @@ -187,7 +187,7 @@ const processIdentify = async (message, destination, propertyMap) => { * @param {*} destination * @returns */ -const processTrack = async (message, destination) => { +const processTrack = async ({ message, destination }) => { const { Config } = destination; let payload = constructPayload(message, mappingConfig[ConfigCategory.TRACK.name]); diff --git a/src/v0/destinations/hs/transform.js b/src/v0/destinations/hs/transform.js index 9eed244af4..190dac7cc5 100644 --- a/src/v0/destinations/hs/transform.js +++ b/src/v0/destinations/hs/transform.js @@ -17,7 +17,7 @@ const { validateDestinationConfig, } = require('./util'); -const processSingleMessage = async (message, destination, propertyMap) => { +const processSingleMessage = async ({ message, destination, metadata }, propertyMap) => { if (!message.type) { throw new InstrumentationError('Message type is not present. Aborting message.'); } @@ -30,18 +30,18 @@ const processSingleMessage = async (message, destination, propertyMap) => { case EventType.IDENTIFY: { response = []; if (destination.Config.apiVersion === API_VERSION.v3) { - response.push(await processIdentify(message, destination, propertyMap)); + response.push(await processIdentify({ message, destination, metadata }, propertyMap)); } else { // Legacy API - response.push(await processLegacyIdentify(message, destination, propertyMap)); + response.push(await processLegacyIdentify({ message, destination, metadata }, propertyMap)); } break; } case EventType.TRACK: if (destination.Config.apiVersion === API_VERSION.v3) { - response = await processTrack(message, destination, propertyMap); + response = await processTrack({ message, destination }, propertyMap); } else { - response = await processLegacyTrack(message, destination, propertyMap); + response = await processLegacyTrack({ message, destination, metadata }, propertyMap); } break; default: @@ -53,15 +53,19 @@ const processSingleMessage = async (message, destination, propertyMap) => { // has been deprecated - using routerTransform for both the versions const process = async (event) => { - const { destination, message } = event; + const { destination, message, metadata } = event; const mappedToDestination = get(message, MappedToDestinationKey); let events = []; events = [event]; if (mappedToDestination && GENERIC_TRUE_VALUES.includes(mappedToDestination?.toString())) { // get info about existing objects and splitting accordingly. - events = await splitEventsForCreateUpdate([event], destination); + events = await splitEventsForCreateUpdate([event], destination, metadata); } - return processSingleMessage(events[0].message, events[0].destination); + return processSingleMessage({ + message: events[0].message, + destination: events[0].destination, + metadata: events[0].metadata || metadata, + }); }; // we are batching by default at routerTransform @@ -71,7 +75,7 @@ const processRouterDest = async (inputs, reqMetadata) => { const successRespList = []; const errorRespList = []; // using the first destination config for transforming the batch - const { destination } = tempInputs[0]; + const { destination, metadata } = tempInputs[0]; let propertyMap; const mappedToDestination = get(tempInputs[0].message, MappedToDestinationKey); const { objectType } = getDestinationExternalIDInfoForRetl(tempInputs[0].message, 'HS'); @@ -80,9 +84,9 @@ const processRouterDest = async (inputs, reqMetadata) => { if (mappedToDestination && GENERIC_TRUE_VALUES.includes(mappedToDestination?.toString())) { // skip splitting the batches to inserts and updates if object it is an association if (objectType.toLowerCase() !== 'association') { - propertyMap = await getProperties(destination); + propertyMap = await getProperties(destination, metadata); // get info about existing objects and splitting accordingly. - tempInputs = await splitEventsForCreateUpdate(tempInputs, destination); + tempInputs = await splitEventsForCreateUpdate(tempInputs, destination, metadata); } } else { // reduce the no. of calls for properties endpoint @@ -90,7 +94,7 @@ const processRouterDest = async (inputs, reqMetadata) => { (input) => fetchFinalSetOfTraits(input.message) !== undefined, ); if (traitsFound) { - propertyMap = await getProperties(destination); + propertyMap = await getProperties(destination, metadata); } } } catch (error) { @@ -111,8 +115,7 @@ const processRouterDest = async (inputs, reqMetadata) => { } else { // event is not transformed let receivedResponse = await processSingleMessage( - input.message, - destination, + { message: input.message, destination, metadata: input.metadata }, propertyMap, ); diff --git a/src/v0/destinations/hs/util.js b/src/v0/destinations/hs/util.js index b30207fe15..38b2e636b9 100644 --- a/src/v0/destinations/hs/util.js +++ b/src/v0/destinations/hs/util.js @@ -90,7 +90,7 @@ const fetchFinalSetOfTraits = (message) => { * @param {*} destination * @returns */ -const getProperties = async (destination) => { +const getProperties = async (destination, metadata) => { let hubspotPropertyMap = {}; let hubspotPropertyMapResponse; const { Config } = destination; @@ -110,6 +110,7 @@ const getProperties = async (destination) => { endpointPath: `/properties/v1/contacts/properties`, requestMethod: 'GET', module: 'router', + metadata, }); hubspotPropertyMapResponse = processAxiosResponse(hubspotPropertyMapResponse); } else { @@ -124,6 +125,7 @@ const getProperties = async (destination) => { endpointPath: `/properties/v1/contacts/properties?hapikey`, requestMethod: 'GET', module: 'router', + metadata, }, ); hubspotPropertyMapResponse = processAxiosResponse(hubspotPropertyMapResponse); @@ -208,7 +210,7 @@ const getUTCMidnightTimeStampValue = (propValue) => { * @param {*} propertyMap * @returns */ -const getTransformedJSON = async (message, destination, propertyMap) => { +const getTransformedJSON = async ({ message, destination, metadata }, propertyMap) => { let rawPayload = {}; const traits = fetchFinalSetOfTraits(message); @@ -217,7 +219,7 @@ const getTransformedJSON = async (message, destination, propertyMap) => { if (!propertyMap) { // fetch HS properties // eslint-disable-next-line no-param-reassign - propertyMap = await getProperties(destination); + propertyMap = await getProperties(destination, metadata); } rawPayload = constructPayload(message, hsCommonConfigJson); @@ -325,7 +327,7 @@ const getLookupFieldValue = (message, lookupField) => { * @param {*} destination * @returns */ -const searchContacts = async (message, destination) => { +const searchContacts = async (message, destination, metadata) => { const { Config } = destination; let searchContactsResponse; let contactId; @@ -377,6 +379,7 @@ const searchContacts = async (message, destination) => { endpointPath, requestMethod: 'POST', module: 'router', + metadata, }, ); searchContactsResponse = processAxiosResponse(searchContactsResponse); @@ -389,6 +392,7 @@ const searchContacts = async (message, destination) => { endpointPath, requestMethod: 'POST', module: 'router', + metadata, }); searchContactsResponse = processAxiosResponse(searchContactsResponse); } @@ -528,6 +532,7 @@ const performHubSpotSearch = async ( objectType, identifierType, destination, + metadata, ) => { let checkAfter = 1; const searchResults = []; @@ -556,6 +561,7 @@ const performHubSpotSearch = async ( endpointPath, requestMethod: 'POST', module: 'router', + metadata, }); const processedResponse = processAxiosResponse(searchResponse); @@ -655,7 +661,7 @@ const getRequestData = (identifierType, chunk) => { * @param {*} inputs * @param {*} destination */ -const getExistingContactsData = async (inputs, destination) => { +const getExistingContactsData = async (inputs, destination, metadata) => { const { Config } = destination; const hsIdsToBeUpdated = []; const firstMessage = inputs[0].message; @@ -683,6 +689,7 @@ const getExistingContactsData = async (inputs, destination) => { objectType, identifierType, destination, + metadata, ); if (searchResults.length > 0) { hsIdsToBeUpdated.push(...searchResults); @@ -728,9 +735,9 @@ const setHsSearchId = (input, id, useSecondaryProp = false) => { * For email as primary key we use `hs_additional_emails` as well property to search existing contacts * */ -const splitEventsForCreateUpdate = async (inputs, destination) => { +const splitEventsForCreateUpdate = async (inputs, destination, metadata) => { // get all the id and properties of already existing objects needed for update. - const hsIdsToBeUpdated = await getExistingContactsData(inputs, destination); + const hsIdsToBeUpdated = await getExistingContactsData(inputs, destination, metadata); const resultInput = inputs.map((input) => { const { message } = input; @@ -805,12 +812,12 @@ const getHsSearchId = (message) => { * @param {*} traits * @param {*} destination */ -const populateTraits = async (propertyMap, traits, destination) => { +const populateTraits = async (propertyMap, traits, destination, metadata) => { const populatedTraits = traits; let propertyToTypeMap = propertyMap; if (!propertyToTypeMap) { // fetch HS properties - propertyToTypeMap = await getProperties(destination); + propertyToTypeMap = await getProperties(destination, metadata); } const keys = Object.keys(populatedTraits); diff --git a/src/v0/destinations/marketo/transform.js b/src/v0/destinations/marketo/transform.js index b811596f95..accca1d449 100644 --- a/src/v0/destinations/marketo/transform.js +++ b/src/v0/destinations/marketo/transform.js @@ -55,7 +55,7 @@ const authCache = new Cache(AUTH_CACHE_TTL); // 1 hr // fails the transformer if auth fails // ------------------------ // Ref: https://developers.marketo.com/rest-api/authentication/#creating_an_access_token -const getAuthToken = async (formattedDestination) => +const getAuthToken = async (formattedDestination, metadata) => authCache.get(formattedDestination.ID, async () => { const { accountId, clientId, clientSecret } = formattedDestination; const clientResponse = await sendGetRequest( @@ -67,6 +67,7 @@ const getAuthToken = async (formattedDestination) => grant_type: 'client_credentials', }, }, + metadata, ); const data = marketoResponseHandler(clientResponse, 'During fetching auth token'); if (data) { @@ -92,7 +93,7 @@ const getAuthToken = async (formattedDestination) => // If lookupField is omitted, the default key is email. // ------------------------ // Thus we'll always be using createOrUpdate -const createOrUpdateLead = async (formattedDestination, token, userId, anonymousId) => +const createOrUpdateLead = async (formattedDestination, token, userId, anonymousId, metadata) => userIdLeadCache.get(userId || anonymousId, async () => { const attribute = userId ? { userId } : { anonymousId }; stats.increment(LEAD_LOOKUP_METRIC, { @@ -114,6 +115,7 @@ const createOrUpdateLead = async (formattedDestination, token, userId, anonymous 'Content-type': JSON_MIME_TYPE, }, }, + metadata, ); const data = getResponseHandlerData( clientResponse, @@ -135,7 +137,7 @@ const createOrUpdateLead = async (formattedDestination, token, userId, anonymous // ------------------------ // Ref: https://developers.marketo.com/rest-api/lead-database/leads/#create_and_update // ------------------------ -const lookupLeadUsingEmail = async (formattedDestination, token, email) => +const lookupLeadUsingEmail = async (formattedDestination, token, email, metadata) => emailLeadCache.get(email, async () => { stats.increment(LEAD_LOOKUP_METRIC, { type: 'email', action: 'fetch' }); const clientResponse = await sendGetRequest( @@ -145,6 +147,7 @@ const lookupLeadUsingEmail = async (formattedDestination, token, email) => params: { filterValues: email, filterType: 'email' }, headers: { Authorization: `Bearer ${token}` }, }, + metadata, ); const data = getResponseHandlerData( clientResponse, @@ -167,7 +170,7 @@ const lookupLeadUsingEmail = async (formattedDestination, token, email) => // ------------------------ // Ref: https://developers.marketo.com/rest-api/lead-database/leads/#create_and_update // ------------------------ -const lookupLeadUsingId = async (formattedDestination, token, userId, anonymousId) => +const lookupLeadUsingId = async (formattedDestination, token, userId, anonymousId, metadata) => userIdLeadCache.get(userId || anonymousId, async () => { stats.increment(LEAD_LOOKUP_METRIC, { type: 'userId', action: 'fetch' }); const clientResponse = await sendGetRequest( @@ -179,6 +182,7 @@ const lookupLeadUsingId = async (formattedDestination, token, userId, anonymousI }, headers: { Authorization: `Bearer ${token}` }, }, + metadata, ); const data = getResponseHandlerData( clientResponse, @@ -195,7 +199,7 @@ const lookupLeadUsingId = async (formattedDestination, token, userId, anonymousI return null; }); -const getLeadId = async (message, formattedDestination, token) => { +const getLeadId = async (message, formattedDestination, token, metadata) => { // precedence ->> // -> externalId (context.externalId[0].type == marketoLeadId) // -> lookup lead using email @@ -225,10 +229,16 @@ const getLeadId = async (message, formattedDestination, token) => { if (!leadId) { // search for lead using email if (email) { - leadId = await lookupLeadUsingEmail(formattedDestination, token, email); + leadId = await lookupLeadUsingEmail(formattedDestination, token, email, metadata); } else { // search lead using userId or anonymousId - leadId = await lookupLeadUsingId(formattedDestination, token, userId, message.anonymousId); + leadId = await lookupLeadUsingId( + formattedDestination, + token, + userId, + message.anonymousId, + metadata, + ); } } @@ -236,7 +246,13 @@ const getLeadId = async (message, formattedDestination, token) => { if (!leadId) { // check we have permission to create lead on marketo if (formattedDestination.createIfNotExist) { - leadId = await createOrUpdateLead(formattedDestination, token, userId, message.anonymousId); + leadId = await createOrUpdateLead( + formattedDestination, + token, + userId, + message.anonymousId, + metadata, + ); } else { throw new ConfigurationError('Lead creation is turned off on the dashboard'); } @@ -264,7 +280,7 @@ const getLeadId = async (message, formattedDestination, token) => { // ------------------------ // Almost same as leadId lookup. Noticable difference from lookup is we'll using // `id` i.e. leadId as lookupField at the end of it -const processIdentify = async (message, formattedDestination, token) => { +const processIdentify = async (message, formattedDestination, token, metadata) => { // If mapped to destination, Add externalId to traits if (get(message, MappedToDestinationKey)) { addExternalIdToTraits(message); @@ -277,7 +293,7 @@ const processIdentify = async (message, formattedDestination, token) => { throw new InstrumentationError('Invalid traits value for Marketo'); } - const leadId = await getLeadId(message, formattedDestination, token); + const leadId = await getLeadId(message, formattedDestination, token, metadata); let attribute = constructPayload(traits, identifyConfig); // leadTraitMapping will not be used if mapping is done through VDM in rETL @@ -338,7 +354,7 @@ const processIdentify = async (message, formattedDestination, token) => { // process track events - only mapped events // ------------------------ // Ref: https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Activities/addCustomActivityUsingPOST -const processTrack = async (message, formattedDestination, token) => { +const processTrack = async (message, formattedDestination, token, metadata) => { // check if trackAnonymousEvent is turned off and userId is not present - fail // check if the event is mapped in customActivityEventMap. if not - fail // get primaryKey name for the event @@ -373,7 +389,7 @@ const processTrack = async (message, formattedDestination, token) => { } // get leadId - const leadId = await getLeadId(message, formattedDestination, token); + const leadId = await getLeadId(message, formattedDestination, token, metadata); // handle addition of custom activity attributes // Reference: https://developers.marketo.com/rest-api/lead-database/activities/#add_custom_activities @@ -420,7 +436,7 @@ const responseWrapper = (response) => { return resp; }; -const processEvent = async (message, destination, token) => { +const processEvent = async ({ message, destination, metadata }, token) => { if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } @@ -430,10 +446,10 @@ const processEvent = async (message, destination, token) => { let response; switch (messageType) { case EventType.IDENTIFY: - response = await processIdentify(message, formattedDestination, token); + response = await processIdentify(message, formattedDestination, token, metadata); break; case EventType.TRACK: - response = await processTrack(message, formattedDestination, token); + response = await processTrack(message, formattedDestination, token, metadata); break; default: throw new InstrumentationError('Message type not supported'); @@ -444,11 +460,11 @@ const processEvent = async (message, destination, token) => { }; const process = async (event) => { - const token = await getAuthToken(formatConfig(event.destination)); + const token = await getAuthToken(formatConfig(event.destination), event.metadata); if (!token) { throw new UnauthorizedError('Authorization failed'); } - const response = await processEvent(event.message, event.destination, token); + const response = await processEvent(event, token); return response; }; @@ -457,7 +473,7 @@ const processRouterDest = async (inputs, reqMetadata) => { // If destination information is not present Error should be thrown let token; try { - token = await getAuthToken(formatConfig(inputs[0].destination)); + token = await getAuthToken(formatConfig(inputs[0].destination), inputs[0].metadata); // If token is null track/identify calls cannot be executed. if (!token) { @@ -483,7 +499,7 @@ const processRouterDest = async (inputs, reqMetadata) => { inputs.map(async (input) => { try { return getSuccessRespEvents( - await processEvent(input.message, input.destination, token), + await processEvent(input, token), [input.metadata], input.destination, ); diff --git a/src/v0/destinations/marketo/util.js b/src/v0/destinations/marketo/util.js index b3a24fb411..aee872efac 100644 --- a/src/v0/destinations/marketo/util.js +++ b/src/v0/destinations/marketo/util.js @@ -243,13 +243,14 @@ const marketoResponseHandler = ( * @param {*} options * @returns { response, status } */ -const sendGetRequest = async (url, options) => { +const sendGetRequest = async (url, options, metadata) => { const clientResponse = await httpGET(url, options, { destType: 'marketo', feature: 'transformation', endpointPath: `/v1/leads`, requestMethod: 'GET', module: 'router', + metadata, }); const processedResponse = processAxiosResponse(clientResponse); return processedResponse; @@ -261,13 +262,14 @@ const sendGetRequest = async (url, options) => { * @param {*} options * @returns { response, status } */ -const sendPostRequest = async (url, data, options) => { +const sendPostRequest = async (url, data, options, metadata) => { const clientResponse = await httpPOST(url, data, options, { destType: 'marketo', feature: 'transformation', endpointPath: `/v1/leads`, requestMethod: 'POST', module: 'router', + metadata, }); const processedResponse = processAxiosResponse(clientResponse); return processedResponse; diff --git a/src/v0/destinations/marketo_static_list/transform.js b/src/v0/destinations/marketo_static_list/transform.js index 92c137c614..810b528bbf 100644 --- a/src/v0/destinations/marketo_static_list/transform.js +++ b/src/v0/destinations/marketo_static_list/transform.js @@ -93,7 +93,7 @@ const processEvent = (event) => { // eslint-disable-next-line @typescript-eslint/no-unused-vars const process = async (event, _processParams) => { - const token = await getAuthToken(formatConfig(event.destination)); + const token = await getAuthToken(formatConfig(event.destination), event.metadata); if (!token) { throw new UnauthorizedError('Authorization failed'); } @@ -106,9 +106,9 @@ const processRouterDest = async (inputs, reqMetadata) => { // Token needs to be generated for marketo which will be done on input level. // If destination information is not present Error should be thrown - const { destination } = inputs[0]; + const { destination, metadata } = inputs[0]; try { - const token = await getAuthToken(formatConfig(destination)); + const token = await getAuthToken(formatConfig(destination), metadata); if (!token) { throw new UnauthorizedError('Could not retrieve authorisation token'); } diff --git a/src/v0/destinations/pardot/networkHandler.js b/src/v0/destinations/pardot/networkHandler.js index 60d2f7ee23..6344301d39 100644 --- a/src/v0/destinations/pardot/networkHandler.js +++ b/src/v0/destinations/pardot/networkHandler.js @@ -118,6 +118,7 @@ const prepareProxyReq = (request) => { * @returns */ const pardotProxyRequest = async (request) => { + const { metadata } = request; const { endpoint, data, method, params, headers } = prepareProxyReq(request); const requestOptions = { @@ -130,6 +131,7 @@ const pardotProxyRequest = async (request) => { const response = await httpSend(requestOptions, { feature: 'proxy', destType: 'pardot', + metadata, }); return response; }; diff --git a/src/v0/destinations/rakuten/networkHandler.js b/src/v0/destinations/rakuten/networkHandler.js index 4c97a23e51..95189cab62 100644 --- a/src/v0/destinations/rakuten/networkHandler.js +++ b/src/v0/destinations/rakuten/networkHandler.js @@ -10,6 +10,7 @@ const { HTTP_STATUS_CODES } = require('../../util/constant'); const DESTINATION = 'RAKUTEN'; const prepareProxyRequest = (request) => request; const proxyRequest = async (request, destType) => { + const { metadata } = request; const { endpoint, data, method, params, headers } = prepareProxyRequest(request); const requestOptions = { url: endpoint, @@ -21,6 +22,7 @@ const proxyRequest = async (request, destType) => { const response = await httpSend(requestOptions, { feature: 'proxy', destType, + metadata, endpointPath: '/ep', requestMethod: 'GET', module: 'dataDelivery', diff --git a/src/v0/destinations/snapchat_custom_audience/networkHandler.js b/src/v0/destinations/snapchat_custom_audience/networkHandler.js index 6044216293..da2a021345 100644 --- a/src/v0/destinations/snapchat_custom_audience/networkHandler.js +++ b/src/v0/destinations/snapchat_custom_audience/networkHandler.js @@ -31,6 +31,7 @@ const prepareProxyReq = (request) => { }; const scAudienceProxyRequest = async (request) => { + const { metadata } = request; const { endpoint, data, method, params, headers } = prepareProxyReq(request); const requestOptions = { @@ -46,6 +47,7 @@ const scAudienceProxyRequest = async (request) => { endpointPath: '/segments/segmentId/users', requestMethod: requestOptions?.method, module: 'dataDelivery', + metadata, }); return response; }; diff --git a/src/v0/destinations/the_trade_desk/networkHandler.js b/src/v0/destinations/the_trade_desk/networkHandler.js index aebbfc0785..d04b5216b0 100644 --- a/src/v0/destinations/the_trade_desk/networkHandler.js +++ b/src/v0/destinations/the_trade_desk/networkHandler.js @@ -10,6 +10,7 @@ const tags = require('../../util/tags'); const { JSON_MIME_TYPE } = require('../../util/constant'); const proxyRequest = async (request) => { + const { metadata } = request; const { endpoint, data, method, params, headers, config } = prepareProxyRequest(request); if (!config?.advertiserSecretKey) { @@ -43,6 +44,7 @@ const proxyRequest = async (request) => { endpointPath: '/track/realtimeconversion', requestMethod: 'POST', module: 'dataDelivery', + metadata, }); return response; }; diff --git a/src/v0/destinations/user/transform.js b/src/v0/destinations/user/transform.js index ed04f5ccd4..24baadd200 100644 --- a/src/v0/destinations/user/transform.js +++ b/src/v0/destinations/user/transform.js @@ -43,23 +43,24 @@ const responseBuilder = async (payload, endpoint, method, apiKey) => { throw new TransformationError('Something went wrong while constructing the payload'); }; -const identifyResponseBuilder = async (message, destination) => { +const identifyResponseBuilder = async (event) => { + const { destination } = event; let builder; - const user = await retrieveUserFromLookup(message, destination); + const user = await retrieveUserFromLookup(event); const { Config } = destination; const { apiKey } = Config; // If user already exist we will update it else creates a new user if (!user) { - builder = createOrUpdateUserPayloadBuilder(message, destination); + builder = createOrUpdateUserPayloadBuilder(event); } else { const { id } = user; - builder = createOrUpdateUserPayloadBuilder(message, destination, id); + builder = createOrUpdateUserPayloadBuilder(event, id); } const { payload, endpoint, method } = builder; return responseBuilder(payload, endpoint, method, apiKey); }; -const trackResponseBuilder = async (message, destination) => { +const trackResponseBuilder = async ({ message, destination, metadata }) => { if (!message.event) { throw new InstrumentationError('Parameter event is required'); } @@ -68,7 +69,7 @@ const trackResponseBuilder = async (message, destination) => { let endpoint; let method; let builder; - const user = await retrieveUserFromLookup(message, destination); + const user = await retrieveUserFromLookup({ message, destination, metadata }); const { Config } = destination; const { apiKey, appSubdomain } = Config; if (user) { @@ -85,12 +86,12 @@ const trackResponseBuilder = async (message, destination) => { ); }; -const pageResponseBuilder = async (message, destination) => { +const pageResponseBuilder = async ({ message, destination, metadata }) => { let payload; let endpoint; let method; let builder; - const user = await retrieveUserFromLookup(message, destination); + const user = await retrieveUserFromLookup({ message, destination, metadata }); const { Config } = destination; const { apiKey, appSubdomain } = Config; if (user) { @@ -106,14 +107,14 @@ const pageResponseBuilder = async (message, destination) => { ); }; -const groupResponseBuilder = async (message, destination) => { +const groupResponseBuilder = async ({ message, destination, metadata }) => { validateGroupPayload(message); let payload; let endpoint; let method; let builder; - const user = await getUserByCustomId(message, destination); + const user = await getUserByCustomId(message, destination, metadata); const { Config } = destination; const { apiKey, appSubdomain } = Config; /* @@ -121,11 +122,11 @@ const groupResponseBuilder = async (message, destination) => { * user does not exist -> throw an error */ if (user) { - let company = await getCompanyByCustomId(message, destination); + let company = await getCompanyByCustomId(message, destination, metadata); if (!company) { - company = await createCompany(message, destination); + company = await createCompany(message, destination, metadata); } else { - company = await updateCompany(message, destination, company); + company = await updateCompany(message, destination, company, metadata); } builder = addUserToCompanyPayloadBuilder(user, company); payload = builder.payload; @@ -137,7 +138,8 @@ const groupResponseBuilder = async (message, destination) => { throw new NetworkInstrumentationError('No user found with given userId'); }; -const processEvent = async (message, destination) => { +const processEvent = async (event) => { + const { message } = event; // Validating if message type is even given or not if (!message.type) { throw new InstrumentationError('Event type is required'); @@ -146,16 +148,16 @@ const processEvent = async (message, destination) => { let response; switch (messageType) { case EventType.IDENTIFY: - response = await identifyResponseBuilder(message, destination); + response = await identifyResponseBuilder(event); break; case EventType.GROUP: - response = await groupResponseBuilder(message, destination); + response = await groupResponseBuilder(event); break; case EventType.TRACK: - response = await trackResponseBuilder(message, destination); + response = await trackResponseBuilder(event); break; case EventType.PAGE: - response = await pageResponseBuilder(message, destination); + response = await pageResponseBuilder(event); break; default: throw new InstrumentationError(`Event type ${messageType} is not supported`); @@ -163,7 +165,7 @@ const processEvent = async (message, destination) => { return response; }; -const process = async (event) => processEvent(event.message, event.destination); +const process = async (event) => processEvent(event); const processRouterDest = async (inputs, reqMetadata) => { const respList = await simpleProcessRouterDest(inputs, process, reqMetadata); diff --git a/src/v0/destinations/user/utils.js b/src/v0/destinations/user/utils.js index f332d7a4a7..f469d123d8 100644 --- a/src/v0/destinations/user/utils.js +++ b/src/v0/destinations/user/utils.js @@ -211,7 +211,7 @@ const validateGroupPayload = (message) => { * @param {*} destination * @returns */ -const createCompany = async (message, destination) => { +const createCompany = async (message, destination, metadata) => { const commonCompanyPropertiesPayload = constructPayload( message, MAPPING_CONFIG[CONFIG_CATEGORIES.CREATE_COMPANY.name], @@ -240,6 +240,7 @@ const createCompany = async (message, destination) => { endpointPath: `/companies/`, requestMethod: 'POST', module: 'router', + metadata, }); const data = processAxiosResponse(response); return data.response; @@ -253,7 +254,7 @@ const createCompany = async (message, destination) => { * @param {*} company * @returns */ -const updateCompany = async (message, destination, company) => { +const updateCompany = async (message, destination, company, metadata) => { const commonCompanyPropertiesPayload = constructPayload( message, MAPPING_CONFIG[CONFIG_CATEGORIES.UPDATE_COMPANY.name], @@ -283,6 +284,7 @@ const updateCompany = async (message, destination, company) => { endpointPath: `/companies/`, requestMethod: 'PUT', module: 'router', + metadata, }); const data = processAxiosResponse(response); return data.response; @@ -296,7 +298,7 @@ const updateCompany = async (message, destination, company) => { * @param {*} appSubdomain * @returns */ -const getUserByUserKey = async (apiKey, userKey, appSubdomain) => { +const getUserByUserKey = async (apiKey, userKey, appSubdomain, metadata) => { const endpoint = prepareUrl(`${BASE_ENDPOINT}/users/search/?key=${userKey}`, appSubdomain); const requestOptions = { headers: { @@ -312,6 +314,7 @@ const getUserByUserKey = async (apiKey, userKey, appSubdomain) => { endpointPath: `/users/search`, requestMethod: 'GET', module: 'router', + metadata, }); const processedUserResponse = processAxiosResponse(userResponse); if (processedUserResponse.status === 200) { @@ -328,7 +331,7 @@ const getUserByUserKey = async (apiKey, userKey, appSubdomain) => { * @param {*} appSubdomain * @returns */ -const getUserByEmail = async (apiKey, email, appSubdomain) => { +const getUserByEmail = async (apiKey, email, appSubdomain, metadata) => { if (!email) { throw new InstrumentationError('Lookup field : email value is not present'); } @@ -348,6 +351,7 @@ const getUserByEmail = async (apiKey, email, appSubdomain) => { endpointPath: `/users/search/?email`, requestMethod: 'GET', module: 'router', + metadata, }); const processedUserResponse = processAxiosResponse(userResponse); @@ -366,7 +370,7 @@ const getUserByEmail = async (apiKey, email, appSubdomain) => { * @param {*} appSubdomain * @returns */ -const getUserByPhoneNumber = async (apiKey, phoneNumber, appSubdomain) => { +const getUserByPhoneNumber = async (apiKey, phoneNumber, appSubdomain, metadata) => { if (!phoneNumber) { throw new InstrumentationError('Lookup field : phone value is not present'); } @@ -389,6 +393,7 @@ const getUserByPhoneNumber = async (apiKey, phoneNumber, appSubdomain) => { endpointPath: `/users/search/?phone_number`, requestMethod: 'GET', module: 'router', + metadata, }); const processedUserResponse = processAxiosResponse(userResponse); @@ -415,7 +420,7 @@ const getUserByPhoneNumber = async (apiKey, phoneNumber, appSubdomain) => { * @param {*} destination * @returns */ -const getUserByCustomId = async (message, destination) => { +const getUserByCustomId = async (message, destination, metadata) => { const { Config } = destination; const { appSubdomain, apiKey } = Config; const userCustomId = getFieldValueFromMessage(message, 'userId'); @@ -436,6 +441,7 @@ const getUserByCustomId = async (message, destination) => { endpointPath: `/users-by-id/`, requestMethod: 'GET', module: 'router', + metadata, }); const processedUserResponse = processAxiosResponse(userResponse); @@ -453,7 +459,7 @@ const getUserByCustomId = async (message, destination) => { * @param {*} destination * @returns */ -const getCompanyByCustomId = async (message, destination) => { +const getCompanyByCustomId = async (message, destination, metadata) => { const { Config } = destination; const { appSubdomain, apiKey } = Config; const companyCustomId = getFieldValueFromMessage(message, 'groupId'); @@ -474,6 +480,7 @@ const getCompanyByCustomId = async (message, destination) => { endpointPath: `/companies-by-id/`, requestMethod: 'GET', module: 'router', + metadata, }); const processedUserResponse = processAxiosResponse(response); if (processedUserResponse.status === 200) { @@ -490,12 +497,12 @@ const getCompanyByCustomId = async (message, destination) => { * @param {*} destination * @returns */ -const retrieveUserFromLookup = async (message, destination) => { +const retrieveUserFromLookup = async ({ message, destination, metadata }) => { const { Config } = destination; const { appSubdomain, apiKey } = Config; const userKey = getDestinationExternalID(message, 'userKey'); if (isDefinedAndNotNullAndNotEmpty(userKey)) { - return getUserByUserKey(apiKey, userKey, appSubdomain); + return getUserByUserKey(apiKey, userKey, appSubdomain, metadata); } const integrationsObj = getIntegrationsObj(message, 'user'); @@ -504,11 +511,11 @@ const retrieveUserFromLookup = async (message, destination) => { const lookupFieldValue = getFieldValueFromMessage(message, lookupField); if (lookupField === 'email') { - return getUserByEmail(apiKey, lookupFieldValue, appSubdomain); + return getUserByEmail(apiKey, lookupFieldValue, appSubdomain, metadata); } if (lookupField === 'phone') { - return getUserByPhoneNumber(apiKey, lookupFieldValue, appSubdomain); + return getUserByPhoneNumber(apiKey, lookupFieldValue, appSubdomain, metadata); } throw new InstrumentationError( @@ -517,11 +524,11 @@ const retrieveUserFromLookup = async (message, destination) => { } else { const userId = getValueFromMessage(message, 'userId'); if (userId) { - return getUserByCustomId(message, destination); + return getUserByCustomId(message, destination, metadata); } const email = getFieldValueFromMessage(message, 'email'); if (isDefinedAndNotNullAndNotEmpty(email)) { - return getUserByEmail(apiKey, email, appSubdomain); + return getUserByEmail(apiKey, email, appSubdomain, metadata); } throw new InstrumentationError('Default lookup field : email value is empty'); @@ -535,7 +542,7 @@ const retrieveUserFromLookup = async (message, destination) => { * @param {*} id * @returns */ -const createOrUpdateUserPayloadBuilder = (message, destination, id = null) => { +const createOrUpdateUserPayloadBuilder = ({ message, destination }, id = null) => { const { appSubdomain } = destination.Config; const commonUserPropertiesPayload = constructPayload( message, diff --git a/src/v0/destinations/yahoo_dsp/transform.js b/src/v0/destinations/yahoo_dsp/transform.js index 4cd1eee73d..f11f1629a8 100644 --- a/src/v0/destinations/yahoo_dsp/transform.js +++ b/src/v0/destinations/yahoo_dsp/transform.js @@ -21,7 +21,7 @@ const { JSON_MIME_TYPE } = require('../../util/constant'); * @param {*} destination * @returns */ -const responseBuilder = async (message, destination) => { +const responseBuilder = async (message, destination, metadata) => { let dspListPayload = {}; const { Config } = destination; const { listData } = message.properties; @@ -72,7 +72,7 @@ const responseBuilder = async (message, destination) => { response.endpoint = `${BASE_ENDPOINT}/traffic/audiences/${ENDPOINTS[audienceType]}/${audienceId}`; response.body.JSON = removeUndefinedAndNullValues(dspListPayload); response.method = defaultPutRequestConfig.requestMethod; - const accessToken = await getAccessToken(destination); + const accessToken = await getAccessToken(destination, metadata); response.headers = { 'X-Auth-Token': accessToken, 'X-Auth-Method': 'OAuth2', @@ -81,7 +81,7 @@ const responseBuilder = async (message, destination) => { return response; }; -const processEvent = async (message, destination) => { +const processEvent = async ({ message, destination, metadata }) => { let response; if (!message.type) { throw new InstrumentationError('Event type is required'); @@ -93,14 +93,14 @@ const processEvent = async (message, destination) => { throw new InstrumentationError('listData is not present inside properties. Aborting message'); } if (message.type.toLowerCase() === 'audiencelist') { - response = await responseBuilder(message, destination); + response = await responseBuilder(message, destination, metadata); } else { throw new InstrumentationError(`Event type ${message.type} is not supported`, 400); } return response; }; -const process = async (event) => processEvent(event.message, event.destination); +const process = async (event) => processEvent(event); const processRouterDest = async (inputs, reqMetadata) => { const respList = await simpleProcessRouterDest(inputs, process, reqMetadata); diff --git a/src/v0/destinations/yahoo_dsp/util.js b/src/v0/destinations/yahoo_dsp/util.js index 54002a3bce..ba19ac7725 100644 --- a/src/v0/destinations/yahoo_dsp/util.js +++ b/src/v0/destinations/yahoo_dsp/util.js @@ -95,7 +95,7 @@ const createPayload = (audienceList, Config) => { * @param {*} destination * @returns */ -const getAccessToken = async (destination) => { +const getAccessToken = async (destination, metadata) => { const { clientId, clientSecret } = destination.Config; const accessTokenKey = destination.ID; @@ -140,6 +140,7 @@ const getAccessToken = async (destination) => { endpointPath: '/identity/oauth2/access_token', requestMethod: 'POST', module: 'router', + metadata, }); // If the request fails, throwing error. if (dspAuthorisationData.success === false) { diff --git a/src/v0/destinations/zendesk/transform.js b/src/v0/destinations/zendesk/transform.js index cadb1d3964..792e8df350 100644 --- a/src/v0/destinations/zendesk/transform.js +++ b/src/v0/destinations/zendesk/transform.js @@ -95,7 +95,13 @@ const responseBuilderToUpdatePrimaryAccount = ( * @param {*} headers -> Authorizations for API's call * @returns it return payloadbuilder for updating email */ -const payloadBuilderforUpdatingEmail = async (userId, headers, userEmail, baseEndpoint) => { +const payloadBuilderforUpdatingEmail = async ( + userId, + headers, + userEmail, + baseEndpoint, + metadata, +) => { // url for list all identities of user const url = `${baseEndpoint}users/${userId}/identities`; const config = { headers }; @@ -106,6 +112,7 @@ const payloadBuilderforUpdatingEmail = async (userId, headers, userEmail, baseEn endpointPath: 'users/userId/identities', requestMethod: 'POST', module: 'router', + metadata, }); if (res?.response?.data?.count > 0) { const { identities } = res.response.data; @@ -131,7 +138,7 @@ const payloadBuilderforUpdatingEmail = async (userId, headers, userEmail, baseEn return {}; }; -async function createUserFields(url, config, newFields, fieldJson) { +async function createUserFields(url, config, newFields, fieldJson, metadata) { let fieldData; // removing trailing 's' from fieldJson const fieldJsonSliced = fieldJson.slice(0, -1); @@ -154,6 +161,7 @@ async function createUserFields(url, config, newFields, fieldJson) { endpointPath: '/users/userId/identities', requestMethod: 'POST', module: 'router', + metadata, }); if (response.status !== 201) { logger.debug(`${NAME}:: Failed to create User Field : `, field); @@ -173,6 +181,7 @@ async function checkAndCreateUserFields( fieldJson, headers, baseEndpoint, + metadata, ) { let newFields = []; @@ -185,6 +194,7 @@ async function checkAndCreateUserFields( feature: 'transformation', requestMethod: 'POST', module: 'router', + metadata, }); const fields = get(response.data, fieldJson); if (response.data && fields) { @@ -199,7 +209,7 @@ async function checkAndCreateUserFields( ); if (newFields.length > 0) { - await createUserFields(url, config, newFields, fieldJson); + await createUserFields(url, config, newFields, fieldJson, metadata); } } } catch (error) { @@ -249,7 +259,7 @@ function getIdentifyPayload(message, category, destinationConfig, type) { * @param {*} headers headers for authorizations * @returns */ -const getUserIdByExternalId = async (message, headers, baseEndpoint) => { +const getUserIdByExternalId = async (message, headers, baseEndpoint, metadata) => { const externalId = getFieldValueFromMessage(message, 'userIdOnly'); if (!externalId) { logger.debug(`${NAME}:: externalId is required for getting zenuserId`); @@ -265,6 +275,7 @@ const getUserIdByExternalId = async (message, headers, baseEndpoint) => { endpointPath, requestMethod: 'GET', module: 'router', + metadata, }); if (resp?.response?.data?.count > 0) { @@ -278,7 +289,7 @@ const getUserIdByExternalId = async (message, headers, baseEndpoint) => { return undefined; }; -async function getUserId(message, headers, baseEndpoint, type) { +async function getUserId(message, headers, baseEndpoint, type, metadata) { const traits = type === 'group' ? get(message, CONTEXT_TRAITS_KEY_PATH) @@ -298,6 +309,7 @@ async function getUserId(message, headers, baseEndpoint, type) { endpointPath, requestMethod: 'GET', module: 'router', + metadata, }); if (!resp || !resp.data || resp.data.count === 0) { logger.debug(`${NAME}:: User not found`); @@ -315,7 +327,7 @@ async function getUserId(message, headers, baseEndpoint, type) { } } -async function isUserAlreadyAssociated(userId, orgId, headers, baseEndpoint) { +async function isUserAlreadyAssociated(userId, orgId, headers, baseEndpoint, metadata) { const url = `${baseEndpoint}/users/${userId}/organization_memberships.json`; const config = { headers }; try { @@ -325,6 +337,7 @@ async function isUserAlreadyAssociated(userId, orgId, headers, baseEndpoint) { endpointPath: '/users/userId/organization_memberships.json', requestMethod: 'GET', module: 'router', + metadata, }); if (response?.data?.organization_memberships?.[0]?.organization_id === orgId) { return true; @@ -336,7 +349,7 @@ async function isUserAlreadyAssociated(userId, orgId, headers, baseEndpoint) { return false; } -async function createUser(message, headers, destinationConfig, baseEndpoint, type) { +async function createUser(message, headers, destinationConfig, baseEndpoint, type, metadata) { const traits = type === 'group' ? get(message, CONTEXT_TRAITS_KEY_PATH) @@ -360,6 +373,7 @@ async function createUser(message, headers, destinationConfig, baseEndpoint, typ endpointPath: '/users/create_or_update.json', requestMethod: 'POST', module: 'router', + metadata, }); if (!resp.data || !resp.data.user || !resp.data.user.id) { @@ -377,9 +391,16 @@ async function createUser(message, headers, destinationConfig, baseEndpoint, typ } } -async function getUserMembershipPayload(message, headers, orgId, destinationConfig, baseEndpoint) { +async function getUserMembershipPayload( + message, + headers, + orgId, + destinationConfig, + baseEndpoint, + metadata, +) { // let zendeskUserID = await getUserId(message.userId, headers); - let zendeskUserID = await getUserId(message, headers, baseEndpoint, 'group'); + let zendeskUserID = await getUserId(message, headers, baseEndpoint, 'group', metadata); const traits = get(message, CONTEXT_TRAITS_KEY_PATH); if (!zendeskUserID) { if (traits && traits.name && traits.email) { @@ -389,6 +410,7 @@ async function getUserMembershipPayload(message, headers, orgId, destinationConf destinationConfig, baseEndpoint, 'group', + metadata, ); zendeskUserID = zendeskUserId; } else { @@ -405,7 +427,14 @@ async function getUserMembershipPayload(message, headers, orgId, destinationConf return payload; } -async function createOrganization(message, category, headers, destinationConfig, baseEndpoint) { +async function createOrganization( + message, + category, + headers, + destinationConfig, + baseEndpoint, + metadata, +) { if (!isDefinedAndNotNull(message.traits)) { throw new InstrumentationError('Organisation Traits are missing. Aborting.'); } @@ -415,6 +444,7 @@ async function createOrganization(message, category, headers, destinationConfig, category.organizationFieldsJson, headers, baseEndpoint, + metadata, ); const mappingJson = mappingConfig[category.name]; const payload = constructPayload(message, mappingJson); @@ -447,6 +477,7 @@ async function createOrganization(message, category, headers, destinationConfig, endpointPath: '/organizations/create_or_update.json', requestMethod: 'POST', module: 'router', + metadata, }); if (!resp.data || !resp.data.organization) { @@ -468,7 +499,7 @@ function validateUserId(message) { } } -async function processIdentify(message, destinationConfig, headers, baseEndpoint) { +async function processIdentify(message, destinationConfig, headers, baseEndpoint, metadata) { validateUserId(message); const category = ConfigCategory.IDENTIFY; const traits = getFieldValueFromMessage(message, 'traits'); @@ -480,6 +511,7 @@ async function processIdentify(message, destinationConfig, headers, baseEndpoint category.userFieldsJson, headers, baseEndpoint, + metadata, ); const payload = getIdentifyPayload(message, category, destinationConfig, 'identify'); @@ -487,7 +519,12 @@ async function processIdentify(message, destinationConfig, headers, baseEndpoint const returnList = []; if (destinationConfig.searchByExternalId) { - const userIdByExternalId = await getUserIdByExternalId(message, headers, baseEndpoint); + const userIdByExternalId = await getUserIdByExternalId( + message, + headers, + baseEndpoint, + metadata, + ); const userEmail = traits?.email; if (userIdByExternalId && userEmail) { const payloadForUpdatingEmail = await payloadBuilderforUpdatingEmail( @@ -495,6 +532,7 @@ async function processIdentify(message, destinationConfig, headers, baseEndpoint headers, userEmail, baseEndpoint, + metadata, ); if (!isEmptyObject(payloadForUpdatingEmail)) returnList.push(payloadForUpdatingEmail); } @@ -507,7 +545,7 @@ async function processIdentify(message, destinationConfig, headers, baseEndpoint traits.company.id ) { const orgId = traits.company.id; - const userId = await getUserId(message, headers, baseEndpoint); + const userId = await getUserId(message, headers, baseEndpoint, metadata); if (userId) { const membershipUrl = `${baseEndpoint}users/${userId}/organization_memberships.json`; try { @@ -518,6 +556,7 @@ async function processIdentify(message, destinationConfig, headers, baseEndpoint endpointPath: '/users/userId/organization_memberships.json', requestMethod: 'GET', module: 'router', + metadata, }); if ( response.data && @@ -547,7 +586,7 @@ async function processIdentify(message, destinationConfig, headers, baseEndpoint return returnList; } -async function processTrack(message, destinationConfig, headers, baseEndpoint) { +async function processTrack(message, destinationConfig, headers, baseEndpoint, metadata) { validateUserId(message); const traits = getFieldValueFromMessage(message, 'traits'); let userEmail; @@ -568,6 +607,7 @@ async function processTrack(message, destinationConfig, headers, baseEndpoint) { endpointPath, requestMethod: 'GET', module: 'router', + metadata, }); if (!get(userResponse, 'data.users.0.id') || userResponse.data.count === 0) { const { zendeskUserId, email } = await createUser( @@ -575,6 +615,7 @@ async function processTrack(message, destinationConfig, headers, baseEndpoint) { headers, destinationConfig, baseEndpoint, + metadata, ); if (!zendeskUserId) { throw new NetworkInstrumentationError('User not found'); @@ -618,13 +659,20 @@ async function processTrack(message, destinationConfig, headers, baseEndpoint) { return response; } -async function processGroup(message, destinationConfig, headers, baseEndpoint) { +async function processGroup(message, destinationConfig, headers, baseEndpoint, metadata) { const category = ConfigCategory.GROUP; let payload; let url; if (destinationConfig.sendGroupCallsWithoutUserId && !message.userId) { - payload = await createOrganization(message, category, headers, destinationConfig, baseEndpoint); + payload = await createOrganization( + message, + category, + headers, + destinationConfig, + baseEndpoint, + metadata, + ); url = baseEndpoint + category.createEndpoint; } else { validateUserId(message); @@ -634,6 +682,7 @@ async function processGroup(message, destinationConfig, headers, baseEndpoint) { headers, destinationConfig, baseEndpoint, + metadata, ); if (!orgId) { throw new NetworkInstrumentationError( @@ -648,11 +697,12 @@ async function processGroup(message, destinationConfig, headers, baseEndpoint) { orgId, destinationConfig, baseEndpoint, + metadata, ); url = baseEndpoint + category.userMembershipEndpoint; const userId = payload.organization_membership.user_id; - if (await isUserAlreadyAssociated(userId, orgId, headers, baseEndpoint)) { + if (await isUserAlreadyAssociated(userId, orgId, headers, baseEndpoint, metadata)) { throw new InstrumentationError('User is already associated with organization'); } } @@ -678,15 +728,15 @@ async function processSingleMessage(event) { 'Content-Type': JSON_MIME_TYPE, }; - const { message } = event; + const { message, metadata } = event; const evType = getEventType(message); switch (evType) { case EventType.IDENTIFY: - return processIdentify(message, destinationConfig, headers, baseEndpoint); + return processIdentify(message, destinationConfig, headers, baseEndpoint, metadata); case EventType.GROUP: - return processGroup(message, destinationConfig, headers, baseEndpoint); + return processGroup(message, destinationConfig, headers, baseEndpoint, metadata); case EventType.TRACK: - return processTrack(message, destinationConfig, headers, baseEndpoint); + return processTrack(message, destinationConfig, headers, baseEndpoint, metadata); default: throw new InstrumentationError(`Event type ${evType} is not supported`); }