Skip to content

Commit

Permalink
chore: enable labels for api calls - 3 (#3483)
Browse files Browse the repository at this point in the history
* chore: enable labels for api calls - 3

* chore: propagate metadata for user destination

* chore: propagate metadata in braze

* chore: propagate metadata for api calls in hubspot

* chore: propagate metadata for labels in api calls for
- pardot
- rakuten
- snapchat_custom_audience
- the_trade_desk
- yahoo_dsp

* chore: propagate metadata in zendesk

Signed-off-by: Sai Sankeerth <[email protected]>

* chore: propagate metadata in gainsight

---------

Signed-off-by: Sai Sankeerth <[email protected]>
Co-authored-by: Sai Sankeerth <[email protected]>
  • Loading branch information
sanpj2292 and Sai Sankeerth authored Jun 20, 2024
1 parent 9a5f095 commit e6630d4
Show file tree
Hide file tree
Showing 21 changed files with 241 additions and 135 deletions.
10 changes: 6 additions & 4 deletions src/v0/destinations/braze/braze.util.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ describe('dedup utility tests', () => {
};

// Call the function
const users = await BrazeDedupUtility.doApiLookup(identfierChunks, destination);
const users = await BrazeDedupUtility.doApiLookup(identfierChunks, { destination });

// Check the result
expect(users).toEqual([
Expand Down Expand Up @@ -399,7 +399,9 @@ describe('dedup utility tests', () => {
},
}));

const chunkedUserData = await BrazeDedupUtility.doApiLookup(identifierChunks, destination);
const chunkedUserData = await BrazeDedupUtility.doApiLookup(identifierChunks, {
destination,
});
const result = _.flatMap(chunkedUserData);
expect(result).toHaveLength(110);
expect(handleHttpRequest).toHaveBeenCalledTimes(3);
Expand Down Expand Up @@ -455,7 +457,7 @@ describe('dedup utility tests', () => {
},
}));

const users = await BrazeDedupUtility.doApiLookup(chunks, destination);
const users = await BrazeDedupUtility.doApiLookup(chunks, { destination });

expect(handleHttpRequest).toHaveBeenCalledTimes(2);
// Assert that the first chunk was successful and the second failed
Expand Down Expand Up @@ -522,7 +524,7 @@ describe('dedup utility tests', () => {
[{ alias_name: 'alias1', alias_label: 'rudder_id' }],
[{ alias_name: 'alias2', alias_label: 'rudder_id' }],
],
{ Config: { restApiKey: 'xyz' } },
{ destination: { Config: { restApiKey: 'xyz' } } },
);

// restore the original implementation of the mocked functions
Expand Down
5 changes: 3 additions & 2 deletions src/v0/destinations/braze/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ function getUserAttributesObject(message, mappingJson, destination) {
* @param {*} message
* @param {*} destination
*/
async function processIdentify(message, destination) {
async function processIdentify({ message, destination, metadata }) {
const identifyPayload = getIdentifyPayload(message);
const identifyEndpoint = getIdentifyEndpoint(getEndpointFromConfig(destination));
const { processedResponse: brazeIdentifyResp } = await handleHttpRequest(
Expand All @@ -223,6 +223,7 @@ async function processIdentify(message, destination) {
requestMethod: 'POST',
module: 'router',
endpointPath: '/users/identify',
metadata,
},
);

Expand Down Expand Up @@ -517,7 +518,7 @@ async function process(event, processParams = { userStore: new Map() }, reqMetad
const brazeExternalID =
getDestinationExternalID(message, 'brazeExternalId') || message.userId;
if (message.anonymousId && brazeExternalID) {
await processIdentify(message, destination);
await processIdentify(event);
} else {
collectStatsForAliasMissConfigurations(destination.ID);
}
Expand Down
7 changes: 4 additions & 3 deletions src/v0/destinations/braze/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ const BrazeDedupUtility = {
return identfierChunks;
},

async doApiLookup(identfierChunks, destination) {
async doApiLookup(identfierChunks, { destination, metadata }) {
return Promise.all(
identfierChunks.map(async (ids) => {
const externalIdentifiers = ids.filter((id) => id.external_id);
Expand All @@ -167,6 +167,7 @@ const BrazeDedupUtility = {
requestMethod: 'POST',
module: 'router',
endpointPath: '/users/export/ids',
metadata,
},
);
stats.counter('braze_lookup_failure_count', 1, {
Expand All @@ -189,10 +190,10 @@ const BrazeDedupUtility = {
*/
async doLookup(inputs) {
const lookupStartTime = new Date();
const { destination } = inputs[0];
const { destination, metadata } = inputs[0];
const { externalIdsToQuery, aliasIdsToQuery } = this.prepareInputForDedup(inputs);
const identfierChunks = this.prepareChunksForDedup(externalIdsToQuery, aliasIdsToQuery);
const chunkedUserData = await this.doApiLookup(identfierChunks, destination);
const chunkedUserData = await this.doApiLookup(identfierChunks, { destination, metadata });
stats.timing('braze_lookup_time', lookupStartTime, {
destination_id: destination.Config.destinationId,
});
Expand Down
12 changes: 6 additions & 6 deletions src/v0/destinations/gainsight/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const identifyResponseBuilder = (message, { Config }) => {
* Person Object.
* https://support.gainsight.com/Gainsight_NXT/API_and_Developer_Docs/Company_API/Company_API_Documentation
*/
const groupResponseBuilder = async (message, { Config }) => {
const groupResponseBuilder = async (message, { Config }, metadata) => {
const { accessKey } = getConfigOrThrowError(Config, ['accessKey'], 'group');
const groupName = getValueFromMessage(message, 'traits.name');
if (!groupName) {
Expand All @@ -91,7 +91,7 @@ const groupResponseBuilder = async (message, { Config }) => {
throw new InstrumentationError('user email is required for group');
}

const resp = await searchGroup(groupName, Config);
const resp = await searchGroup(groupName, Config, metadata);

let payload = constructPayload(message, groupMapping);
const defaultKeys = Object.keys(payload);
Expand All @@ -104,9 +104,9 @@ const groupResponseBuilder = async (message, { Config }) => {

let groupGsid;
if (resp.data.data.records.length === 0) {
groupGsid = await createGroup(payload, Config);
groupGsid = await createGroup(payload, Config, metadata);
} else {
groupGsid = await updateGroup(payload, Config);
groupGsid = await updateGroup(payload, Config, metadata);

Check warning on line 109 in src/v0/destinations/gainsight/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/gainsight/transform.js#L109

Added line #L109 was not covered by tests
}

const responsePayload = {
Expand Down Expand Up @@ -184,7 +184,7 @@ const trackResponseBuilder = (message, { Config }) => {
* Processing Single event
*/
const process = async (event) => {
const { message, destination } = event;
const { message, destination, metadata } = event;
if (!message.type) {
throw new InstrumentationError('Message Type is not present. Aborting message.');
}
Expand All @@ -196,7 +196,7 @@ const process = async (event) => {
response = identifyResponseBuilder(message, destination);
break;
case EventType.GROUP:
response = await groupResponseBuilder(message, destination);
response = await groupResponseBuilder(message, destination, metadata);
break;
case EventType.TRACK:
response = trackResponseBuilder(message, destination);
Expand Down
9 changes: 6 additions & 3 deletions src/v0/destinations/gainsight/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const { ENDPOINTS, getLookupPayload } = require('./config');
const tags = require('../../util/tags');
const { JSON_MIME_TYPE } = require('../../util/constant');

const searchGroup = async (groupName, Config) => {
const searchGroup = async (groupName, Config, metadata) => {
let resp;
try {
resp = await myAxios.post(
Expand All @@ -28,6 +28,7 @@ const searchGroup = async (groupName, Config) => {
requestMethod: 'POST',
endpointPath: '/data/objects/query/Company',
module: 'router',
metadata,
},
);
} catch (error) {
Expand All @@ -48,7 +49,7 @@ const searchGroup = async (groupName, Config) => {
return resp;
};

const createGroup = async (payload, Config) => {
const createGroup = async (payload, Config, metadata) => {
let resp;
try {
resp = await myAxios.post(
Expand All @@ -68,6 +69,7 @@ const createGroup = async (payload, Config) => {
requestMethod: 'POST',
endpointPath: '/data/objects/Company',
module: 'router',
metadata,
},
);
} catch (error) {
Expand All @@ -88,7 +90,7 @@ const createGroup = async (payload, Config) => {
return resp.data.data.records[0].Gsid;
};

const updateGroup = async (payload, Config) => {
const updateGroup = async (payload, Config, metadata) => {
let resp;
try {
resp = await myAxios.put(
Expand All @@ -111,6 +113,7 @@ const updateGroup = async (payload, Config) => {
requestMethod: 'PUT',
endpointPath: '/data/objects/Company',
module: 'router',
metadata,
},
);
} catch (error) {
Expand Down
13 changes: 8 additions & 5 deletions src/v0/destinations/hs/HSTransform-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const { JSON_MIME_TYPE } = require('../../util/constant');
* @param {*} propertyMap
* @returns
*/
const processLegacyIdentify = async (message, destination, propertyMap) => {
const processLegacyIdentify = async ({ message, destination, metadata }, propertyMap) => {
const { Config } = destination;
let traits = getFieldValueFromMessage(message, 'traits');
const mappedToDestination = get(message, MappedToDestinationKey);
Expand Down Expand Up @@ -82,7 +82,7 @@ const processLegacyIdentify = async (message, destination, propertyMap) => {
response.method = defaultPatchRequestConfig.requestMethod;
}

traits = await populateTraits(propertyMap, traits, destination);
traits = await populateTraits(propertyMap, traits, destination, metadata);
response.body.JSON = removeUndefinedAndNullValues({ properties: traits });
response.source = 'rETL';
response.operation = operation;
Expand All @@ -92,7 +92,10 @@ const processLegacyIdentify = async (message, destination, propertyMap) => {
}
const { email } = traits;

const userProperties = await getTransformedJSON(message, destination, propertyMap);
const userProperties = await getTransformedJSON(
{ message, destination, metadata },
propertyMap,
);

const payload = {
properties: formatPropertyValueForIdentify(userProperties),
Expand Down Expand Up @@ -134,7 +137,7 @@ const processLegacyIdentify = async (message, destination, propertyMap) => {
* @param {*} propertyMap
* @returns
*/
const processLegacyTrack = async (message, destination, propertyMap) => {
const processLegacyTrack = async ({ message, destination, metadata }, propertyMap) => {
const { Config } = destination;

if (!Config.hubID) {
Expand All @@ -151,7 +154,7 @@ const processLegacyTrack = async (message, destination, propertyMap) => {
id: getDestinationExternalID(message, 'hubspotId'),
};

const userProperties = await getTransformedJSON(message, destination, propertyMap);
const userProperties = await getTransformedJSON({ message, destination, metadata }, propertyMap);

const payload = { ...parameters, ...userProperties };
const params = removeUndefinedAndNullValues(payload);
Expand Down
10 changes: 5 additions & 5 deletions src/v0/destinations/hs/HSTransform-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const addHsAuthentication = (response, Config) => {
* @param {*} propertyMap
* @returns
*/
const processIdentify = async (message, destination, propertyMap) => {
const processIdentify = async ({ message, destination, metadata }, propertyMap) => {
const { Config } = destination;
let traits = getFieldValueFromMessage(message, 'traits');
const mappedToDestination = get(message, MappedToDestinationKey);
Expand Down Expand Up @@ -125,7 +125,7 @@ const processIdentify = async (message, destination, propertyMap) => {
response.method = defaultPatchRequestConfig.requestMethod;
}

traits = await populateTraits(propertyMap, traits, destination);
traits = await populateTraits(propertyMap, traits, destination, metadata);
response.body.JSON = removeUndefinedAndNullValues({ properties: traits });
response.source = 'rETL';
response.operation = operation;
Expand All @@ -138,10 +138,10 @@ const processIdentify = async (message, destination, propertyMap) => {

// if contactId is not provided then search
if (!contactId) {
contactId = await searchContacts(message, destination);
contactId = await searchContacts(message, destination, metadata);
}

const properties = await getTransformedJSON(message, destination, propertyMap);
const properties = await getTransformedJSON({ message, destination, metadata }, propertyMap);

const payload = {
properties,
Expand Down Expand Up @@ -187,7 +187,7 @@ const processIdentify = async (message, destination, propertyMap) => {
* @param {*} destination
* @returns
*/
const processTrack = async (message, destination) => {
const processTrack = async ({ message, destination }) => {
const { Config } = destination;

let payload = constructPayload(message, mappingConfig[ConfigCategory.TRACK.name]);
Expand Down
31 changes: 17 additions & 14 deletions src/v0/destinations/hs/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {
validateDestinationConfig,
} = require('./util');

const processSingleMessage = async (message, destination, propertyMap) => {
const processSingleMessage = async ({ message, destination, metadata }, propertyMap) => {
if (!message.type) {
throw new InstrumentationError('Message type is not present. Aborting message.');
}
Expand All @@ -30,18 +30,18 @@ const processSingleMessage = async (message, destination, propertyMap) => {
case EventType.IDENTIFY: {
response = [];
if (destination.Config.apiVersion === API_VERSION.v3) {
response.push(await processIdentify(message, destination, propertyMap));
response.push(await processIdentify({ message, destination, metadata }, propertyMap));
} else {
// Legacy API
response.push(await processLegacyIdentify(message, destination, propertyMap));
response.push(await processLegacyIdentify({ message, destination, metadata }, propertyMap));
}
break;
}
case EventType.TRACK:
if (destination.Config.apiVersion === API_VERSION.v3) {
response = await processTrack(message, destination, propertyMap);
response = await processTrack({ message, destination }, propertyMap);
} else {
response = await processLegacyTrack(message, destination, propertyMap);
response = await processLegacyTrack({ message, destination, metadata }, propertyMap);
}
break;
default:
Expand All @@ -53,15 +53,19 @@ const processSingleMessage = async (message, destination, propertyMap) => {

// has been deprecated - using routerTransform for both the versions
const process = async (event) => {
const { destination, message } = event;
const { destination, message, metadata } = event;
const mappedToDestination = get(message, MappedToDestinationKey);
let events = [];
events = [event];
if (mappedToDestination && GENERIC_TRUE_VALUES.includes(mappedToDestination?.toString())) {
// get info about existing objects and splitting accordingly.
events = await splitEventsForCreateUpdate([event], destination);
events = await splitEventsForCreateUpdate([event], destination, metadata);
}
return processSingleMessage(events[0].message, events[0].destination);
return processSingleMessage({
message: events[0].message,
destination: events[0].destination,
metadata: events[0].metadata || metadata,
});
};

// we are batching by default at routerTransform
Expand All @@ -71,7 +75,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
const successRespList = [];
const errorRespList = [];
// using the first destination config for transforming the batch
const { destination } = tempInputs[0];
const { destination, metadata } = tempInputs[0];
let propertyMap;
const mappedToDestination = get(tempInputs[0].message, MappedToDestinationKey);
const { objectType } = getDestinationExternalIDInfoForRetl(tempInputs[0].message, 'HS');
Expand All @@ -80,17 +84,17 @@ const processRouterDest = async (inputs, reqMetadata) => {
if (mappedToDestination && GENERIC_TRUE_VALUES.includes(mappedToDestination?.toString())) {
// skip splitting the batches to inserts and updates if object it is an association
if (objectType.toLowerCase() !== 'association') {
propertyMap = await getProperties(destination);
propertyMap = await getProperties(destination, metadata);
// get info about existing objects and splitting accordingly.
tempInputs = await splitEventsForCreateUpdate(tempInputs, destination);
tempInputs = await splitEventsForCreateUpdate(tempInputs, destination, metadata);
}
} else {
// reduce the no. of calls for properties endpoint
const traitsFound = tempInputs.some(
(input) => fetchFinalSetOfTraits(input.message) !== undefined,
);
if (traitsFound) {
propertyMap = await getProperties(destination);
propertyMap = await getProperties(destination, metadata);
}
}
} catch (error) {
Expand All @@ -111,8 +115,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
} else {
// event is not transformed
let receivedResponse = await processSingleMessage(
input.message,
destination,
{ message: input.message, destination, metadata: input.metadata },
propertyMap,
);

Expand Down
Loading

0 comments on commit e6630d4

Please sign in to comment.