Skip to content

Commit

Permalink
feat: emersys initial batching commit
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Apr 27, 2024
1 parent 7ee45ee commit 4db2602
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/cdk/v2/destinations/emersys/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const CONFIG_CATEGORIES = {
const MAPPING_CONFIG = getMappingConfig(CONFIG_CATEGORIES, __dirname);

module.exports = {
MAX_BATCH_SIZE: 5000,
MAX_BATCH_SIZE: 1000,
BATCH_ENDPOINT,
API_HEADER_METHOD,
API_VERSION,
Expand Down
157 changes: 146 additions & 11 deletions src/cdk/v2/destinations/emersys/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ const {
isDefinedAndNotNullAndNotEmpty,
removeUndefinedAndNullAndEmptyValues,
removeUndefinedAndNullValues,
isDefinedAndNotNull,
} = require('@rudderstack/integrations-lib');
const { getValueFromMessage } = require('rudder-transformer-cdk/build/utils');
const { getIntegrationsObj } = require('../../../../v0/util');
const { EMAIL_FIELD_ID } = require('./config');
const { EMAIL_FIELD_ID, MAX_BATCH_SIZE } = require('./config');

Check failure on line 16 in src/cdk/v2/destinations/emersys/utils.js

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

'MAX_BATCH_SIZE' is assigned a value but never used

Check failure on line 16 in src/cdk/v2/destinations/emersys/utils.js

View workflow job for this annotation

GitHub Actions / Code Coverage

'MAX_BATCH_SIZE' is assigned a value but never used

function base64Sha(str) {
const hexDigest = crypto.createHash('sha1').update(str).digest('hex');
Expand All @@ -37,7 +38,7 @@ const buildHeader = (destConfig) => {
};

const buildIdentifyPayload = (message, destination) => {
let identifyPayload;
let destinationPayload;
const { fieldMapping, emersysCustomIdentifier, discardEmptyProperties, defaultContactList } =
destination.Config;
const payload = {};
Expand All @@ -61,7 +62,7 @@ const buildIdentifyPayload = (message, destination) => {
// TODO: add validation for opt in field

if (isDefinedAndNotNullAndNotEmpty(payload[emersysIdentifier])) {
identifyPayload = {
destinationPayload = {
key_id: integrationObject.customIdentifierId || emersysIdentifier,
contacts: [...finalPayload],
contact_list_id: integrationObject.contactListId || defaultContactList,
Expand All @@ -72,7 +73,7 @@ const buildIdentifyPayload = (message, destination) => {
);
}

return identifyPayload;
return { eventType: message.type, destinationPayload };
};

function findRudderPropertyByEmersysProperty(emersysProperty, fieldMapping) {
Expand All @@ -85,22 +86,30 @@ function findRudderPropertyByEmersysProperty(emersysProperty, fieldMapping) {
const buildGroupPayload = (message, destination) => {
const { emersysCustomIdentifier, defaultContactList, fieldMapping } = destination.Config;
const integrationObject = getIntegrationsObj(message, 'emersys');
const emersysIdentifier = emersysCustomIdentifier || EMAIL_FIELD_ID;
const emersysIdentifier =
integrationObject.customIdentifierId || emersysCustomIdentifier || EMAIL_FIELD_ID;
const configuredPayloadProperty = findRudderPropertyByEmersysProperty(
emersysIdentifier,
fieldMapping,
);
const externalIdValue = getValueFromMessage(message.context.traits, configuredPayloadProperty);
if (!isDefinedAndNotNull(externalIdValue)) {
throw new InstrumentationError('');
}
const payload = {
key_id: integrationObject.customIdentifierId || emersysIdentifier,
external_ids: [getValueFromMessage(message.context.traits, configuredPayloadProperty)],
key_id: emersysIdentifier,
external_ids: [externalIdValue],
};
return {
payload,
contactListId: message.groupId || defaultContactList,
eventType: message.type,
destinationPayload: {
payload,
contactListId: message.groupId || defaultContactList,
},
};
};

const deduceEndPoint = (message, destConfig) => {
const deduceEndPoint = (message, destConfig, batchGroupId = undefined) => {
let endPoint;
let contactListId;
const { type, groupId } = message;
Expand All @@ -109,17 +118,143 @@ const deduceEndPoint = (message, destConfig) => {
endPoint = 'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1';
break;
case EVENT_TYPE.GROUP:
contactListId = groupId || destConfig.defaultContactList;
contactListId = batchGroupId || groupId || destConfig.defaultContactList;
endPoint = `https://api.emarsys.net/api/v2/contactlist/${contactListId}/add`;
break;
default:
break;
}
return endPoint;
};

function createIdentifyBatches(events) {
// Grouping the payloads based on key_id and contact_list_id
const groupedIdentifyPayload = lodash.groupBy(
events,
(item) =>
`${item.message.body.JSON.destinationPayload.key_id}-${item.message.body.JSON.destinationPayload.contact_list_id}`,
);

// Combining the contacts within each group and maintaining the payload structure
const combinedPayloads = Object.keys(groupedIdentifyPayload).map((key) => {
const group = groupedIdentifyPayload[key];

// Reduce the group to a single payload with combined contacts
const combinedContacts = group.reduce(
(acc, item) => acc.concat(item.message.body.JSON.destinationPayload.contacts),
[],
);

// Use the first item to extract key_id and contact_list_id
const firstItem = group[0].message.body.JSON.destinationPayload;

return {
key_id: firstItem.key_id,
contacts: combinedContacts,
contact_list_id: firstItem.contact_list_id,
};
});

return combinedPayloads;
}

function createGroupBatches(events) {
const grouped = lodash.groupBy(
events,
(item) =>
`${item.message.body.JSON.destinationPayload.payload.key_id}-${item.message.body.JSON.destinationPayload.contactListId}`,
);

// eslint-disable-next-line @typescript-eslint/no-unused-vars
return Object.entries(grouped).map(([key, group]) => {
const keyId = group[0].message.body.JSON.destinationPayload.payload.key_id;
const { contactListId } = group[0].message.body.JSON.destinationPayload;
const combinedExternalIds = group.reduce((acc, item) => {
const ids = item.message.body.JSON.destinationPayload.payload.external_ids;
return acc.concat(ids);
}, []);

return {
endpoint: `https://api.emarsys.net/api/v2/contactlist/${contactListId}/add`,
payload: {
key_id: keyId,
external_ids: combinedExternalIds,
},
};
});
}
function formatPayloadsWithEndpoint(combinedPayloads, endpointUrl = '') {
return combinedPayloads.map((payload) => ({
endpoint: endpointUrl, // You can dynamically determine or pass this value
payload,
}));
}

function batchResponseBuilder(successfulEvents) {
const groupedSuccessfulPayload = {
identify: {},
group: {},
};
let batchesOfIdentifyEvents;
if (successfulEvents.length === 0) {
return [];
}
const constants = {
version: successfulEvents[0].message[0].version,
type: successfulEvents[0].message[0].type,
method: successfulEvents[0].message[0].method,
headers: successfulEvents[0].message[0].headers,
destination: successfulEvents[0].destination,
};

const typedEventGroups = lodash.groupBy(
successfulEvents,
(event) => event.message.body.JSON.eventType,
);
Object.keys(typedEventGroups).forEach((eachEventGroup) => {
switch (eachEventGroup) {
case EVENT_TYPE.IDENTIFY:
batchesOfIdentifyEvents = createIdentifyBatches(eachEventGroup);
groupedSuccessfulPayload.identify = formatPayloadsWithEndpoint(
batchesOfIdentifyEvents,
'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1',
);
break;
case EVENT_TYPE.GROUP:
groupedSuccessfulPayload.group = createGroupBatches(eachEventGroup);
break;
default:
break;
}
return groupedSuccessfulPayload;
});

return chunkedElements.map((elementsBatch, index) => ({

Check failure on line 232 in src/cdk/v2/destinations/emersys/utils.js

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

'chunkedElements' is not defined

Check failure on line 232 in src/cdk/v2/destinations/emersys/utils.js

View workflow job for this annotation

GitHub Actions / Code Coverage

'chunkedElements' is not defined
batchedRequest: {
body: {
JSON: { elements: elementsBatch },
JSON_ARRAY: {},
XML: {},
FORM: {},
},
version: constants.version,
type: constants.type,
method: constants.method,
endpoint: constants.endpoint,
headers: constants.headers,
params: {},
files: {},
},
metadata: chunkedMetadata[index],

Check failure on line 248 in src/cdk/v2/destinations/emersys/utils.js

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

'chunkedMetadata' is not defined

Check failure on line 248 in src/cdk/v2/destinations/emersys/utils.js

View workflow job for this annotation

GitHub Actions / Code Coverage

'chunkedMetadata' is not defined
batched: true,
statusCode: 200,
destination: constants.destination,
}));
}
module.exports = {
buildIdentifyPayload,
buildGroupPayload,
buildHeader,
deduceEndPoint,
batchResponseBuilder,
};

0 comments on commit 4db2602

Please sign in to comment.