From 4db2602bf46f728f0d41eb07128a74f5014c9384 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Sun, 28 Apr 2024 01:15:23 +0530 Subject: [PATCH] feat: emersys initial batching commit --- src/cdk/v2/destinations/emersys/config.js | 2 +- src/cdk/v2/destinations/emersys/utils.js | 157 ++++++++++++++++++++-- 2 files changed, 147 insertions(+), 12 deletions(-) diff --git a/src/cdk/v2/destinations/emersys/config.js b/src/cdk/v2/destinations/emersys/config.js index 344980e7d0..a15044b19d 100644 --- a/src/cdk/v2/destinations/emersys/config.js +++ b/src/cdk/v2/destinations/emersys/config.js @@ -16,7 +16,7 @@ const CONFIG_CATEGORIES = { const MAPPING_CONFIG = getMappingConfig(CONFIG_CATEGORIES, __dirname); module.exports = { - MAX_BATCH_SIZE: 5000, + MAX_BATCH_SIZE: 1000, BATCH_ENDPOINT, API_HEADER_METHOD, API_VERSION, diff --git a/src/cdk/v2/destinations/emersys/utils.js b/src/cdk/v2/destinations/emersys/utils.js index 4741fb0d36..20c6e5924c 100644 --- a/src/cdk/v2/destinations/emersys/utils.js +++ b/src/cdk/v2/destinations/emersys/utils.js @@ -9,10 +9,11 @@ const { isDefinedAndNotNullAndNotEmpty, removeUndefinedAndNullAndEmptyValues, removeUndefinedAndNullValues, + isDefinedAndNotNull, } = require('@rudderstack/integrations-lib'); const { getValueFromMessage } = require('rudder-transformer-cdk/build/utils'); const { getIntegrationsObj } = require('../../../../v0/util'); -const { EMAIL_FIELD_ID } = require('./config'); +const { EMAIL_FIELD_ID, MAX_BATCH_SIZE } = require('./config'); function base64Sha(str) { const hexDigest = crypto.createHash('sha1').update(str).digest('hex'); @@ -37,7 +38,7 @@ const buildHeader = (destConfig) => { }; const buildIdentifyPayload = (message, destination) => { - let identifyPayload; + let destinationPayload; const { fieldMapping, emersysCustomIdentifier, discardEmptyProperties, defaultContactList } = destination.Config; const payload = {}; @@ -61,7 +62,7 @@ const buildIdentifyPayload = (message, destination) => { // TODO: add validation for opt in field if (isDefinedAndNotNullAndNotEmpty(payload[emersysIdentifier])) { - identifyPayload = { + destinationPayload = { key_id: integrationObject.customIdentifierId || emersysIdentifier, contacts: [...finalPayload], contact_list_id: integrationObject.contactListId || defaultContactList, @@ -72,7 +73,7 @@ const buildIdentifyPayload = (message, destination) => { ); } - return identifyPayload; + return { eventType: message.type, destinationPayload }; }; function findRudderPropertyByEmersysProperty(emersysProperty, fieldMapping) { @@ -85,22 +86,30 @@ function findRudderPropertyByEmersysProperty(emersysProperty, fieldMapping) { const buildGroupPayload = (message, destination) => { const { emersysCustomIdentifier, defaultContactList, fieldMapping } = destination.Config; const integrationObject = getIntegrationsObj(message, 'emersys'); - const emersysIdentifier = emersysCustomIdentifier || EMAIL_FIELD_ID; + const emersysIdentifier = + integrationObject.customIdentifierId || emersysCustomIdentifier || EMAIL_FIELD_ID; const configuredPayloadProperty = findRudderPropertyByEmersysProperty( emersysIdentifier, fieldMapping, ); + const externalIdValue = getValueFromMessage(message.context.traits, configuredPayloadProperty); + if (!isDefinedAndNotNull(externalIdValue)) { + throw new InstrumentationError(''); + } const payload = { - key_id: integrationObject.customIdentifierId || emersysIdentifier, - external_ids: [getValueFromMessage(message.context.traits, configuredPayloadProperty)], + key_id: emersysIdentifier, + external_ids: [externalIdValue], }; return { - payload, - contactListId: message.groupId || defaultContactList, + eventType: message.type, + destinationPayload: { + payload, + contactListId: message.groupId || defaultContactList, + }, }; }; -const deduceEndPoint = (message, destConfig) => { +const deduceEndPoint = (message, destConfig, batchGroupId = undefined) => { let endPoint; let contactListId; const { type, groupId } = message; @@ -109,7 +118,7 @@ const deduceEndPoint = (message, destConfig) => { endPoint = 'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1'; break; case EVENT_TYPE.GROUP: - contactListId = groupId || destConfig.defaultContactList; + contactListId = batchGroupId || groupId || destConfig.defaultContactList; endPoint = `https://api.emarsys.net/api/v2/contactlist/${contactListId}/add`; break; default: @@ -117,9 +126,135 @@ const deduceEndPoint = (message, destConfig) => { } return endPoint; }; + +function createIdentifyBatches(events) { + // Grouping the payloads based on key_id and contact_list_id + const groupedIdentifyPayload = lodash.groupBy( + events, + (item) => + `${item.message.body.JSON.destinationPayload.key_id}-${item.message.body.JSON.destinationPayload.contact_list_id}`, + ); + + // Combining the contacts within each group and maintaining the payload structure + const combinedPayloads = Object.keys(groupedIdentifyPayload).map((key) => { + const group = groupedIdentifyPayload[key]; + + // Reduce the group to a single payload with combined contacts + const combinedContacts = group.reduce( + (acc, item) => acc.concat(item.message.body.JSON.destinationPayload.contacts), + [], + ); + + // Use the first item to extract key_id and contact_list_id + const firstItem = group[0].message.body.JSON.destinationPayload; + + return { + key_id: firstItem.key_id, + contacts: combinedContacts, + contact_list_id: firstItem.contact_list_id, + }; + }); + + return combinedPayloads; +} + +function createGroupBatches(events) { + const grouped = lodash.groupBy( + events, + (item) => + `${item.message.body.JSON.destinationPayload.payload.key_id}-${item.message.body.JSON.destinationPayload.contactListId}`, + ); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + return Object.entries(grouped).map(([key, group]) => { + const keyId = group[0].message.body.JSON.destinationPayload.payload.key_id; + const { contactListId } = group[0].message.body.JSON.destinationPayload; + const combinedExternalIds = group.reduce((acc, item) => { + const ids = item.message.body.JSON.destinationPayload.payload.external_ids; + return acc.concat(ids); + }, []); + + return { + endpoint: `https://api.emarsys.net/api/v2/contactlist/${contactListId}/add`, + payload: { + key_id: keyId, + external_ids: combinedExternalIds, + }, + }; + }); +} +function formatPayloadsWithEndpoint(combinedPayloads, endpointUrl = '') { + return combinedPayloads.map((payload) => ({ + endpoint: endpointUrl, // You can dynamically determine or pass this value + payload, + })); +} + +function batchResponseBuilder(successfulEvents) { + const groupedSuccessfulPayload = { + identify: {}, + group: {}, + }; + let batchesOfIdentifyEvents; + if (successfulEvents.length === 0) { + return []; + } + const constants = { + version: successfulEvents[0].message[0].version, + type: successfulEvents[0].message[0].type, + method: successfulEvents[0].message[0].method, + headers: successfulEvents[0].message[0].headers, + destination: successfulEvents[0].destination, + }; + + const typedEventGroups = lodash.groupBy( + successfulEvents, + (event) => event.message.body.JSON.eventType, + ); + Object.keys(typedEventGroups).forEach((eachEventGroup) => { + switch (eachEventGroup) { + case EVENT_TYPE.IDENTIFY: + batchesOfIdentifyEvents = createIdentifyBatches(eachEventGroup); + groupedSuccessfulPayload.identify = formatPayloadsWithEndpoint( + batchesOfIdentifyEvents, + 'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1', + ); + break; + case EVENT_TYPE.GROUP: + groupedSuccessfulPayload.group = createGroupBatches(eachEventGroup); + break; + default: + break; + } + return groupedSuccessfulPayload; + }); + + return chunkedElements.map((elementsBatch, index) => ({ + batchedRequest: { + body: { + JSON: { elements: elementsBatch }, + JSON_ARRAY: {}, + XML: {}, + FORM: {}, + }, + version: constants.version, + type: constants.type, + method: constants.method, + endpoint: constants.endpoint, + headers: constants.headers, + params: {}, + files: {}, + }, + metadata: chunkedMetadata[index], + batched: true, + statusCode: 200, + destination: constants.destination, + })); +} module.exports = { buildIdentifyPayload, buildGroupPayload, buildHeader, deduceEndPoint, + batchResponseBuilder, };