Skip to content

Commit

Permalink
fix: all batching final
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed May 3, 2024
1 parent 68ab695 commit 54ed0b2
Show file tree
Hide file tree
Showing 4 changed files with 474 additions and 121 deletions.
21 changes: 14 additions & 7 deletions src/cdk/v2/destinations/emarsys/procWorkflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ bindings:
exportAll: true
- name: removeUndefinedValues
path: ../../../../v0/util
- name: removeUndefinedAndNullValues
path: ../../../../v0/util
- name: defaultRequestConfig
path: ../../../../v0/util
- name: getIntegrationsObj
path: ../../../../v0/util
- name: getFieldValueFromMessage
path: ../../../../v0/util
- path: ./utils
- path: ./config
- path: lodash
Expand Down Expand Up @@ -55,23 +61,24 @@ steps:
ref: https://dev.emarsys.com/docs/core-api-reference/fl0xx6rwfbwqb-trigger-an-external-event
condition: $.outputs.messageType === {{$.EventType.TRACK}}
template: |
const integrationObject = $.getIntegrationsObj(^.message, 'emersys');
const emersysIdentifierId = integrationObject?.customIdentifierId || ^.destination.Config.emersysCustomIdentifier || $.EMAIL_FIELD_ID;
const properties = ^.message.properties;
const integrationObject = $.getIntegrationsObj(^.message, 'emarsys');
const emersysIdentifierId = integrationObject.customIdentifierId ?? ^.destination.Config.emersysCustomIdentifier ?? $.EMAIL_FIELD_ID;
const payload = {
key_id: emersysIdentifierId ,
external_id: $.deduceExternalIdValue(.message,emersysIdentifierId,.destination.Config.fieldMapping),
trigger_id: integrationObject.trigger_id,
key_id: emersysIdentifierId,
external_id: $.deduceExternalIdValue(^.message,emersysIdentifierId,.destination.Config.fieldMapping),
trigger_id: $.integrationObject.trigger_id,
data: properties.data,
attachment: Array.isArray(properties.attachment)
? properties.attachment
: [properties.attachment],
event_time: $.getFieldValueFromMessage(message, 'timestamp'),
event_time: $.getFieldValueFromMessage(^.message, 'timestamp'),
};
$.context.payload = {
eventType: ^.message.type,
destinationPayload: {
payload: $.removeUndefinedAndNullValues(payload),
eventId: $.deduceEventId(.message,.destination.Config),
eventId: $.deduceEventId(^.message,.destination.Config),
},
};
- name: buildResponse
Expand Down
1 change: 0 additions & 1 deletion src/cdk/v2/destinations/emarsys/rtWorkflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ steps:
description: Batches the successfulEvents
template: |
$.context.batchedPayload = $.batchResponseBuilder($.outputs.successfulEvents);
console.log("batchedPayload",JSON.stringify($.context.batchedPayload));
- name: finalPayload
template: |
Expand Down
213 changes: 149 additions & 64 deletions src/cdk/v2/destinations/emarsys/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ const {
removeUndefinedAndNullAndEmptyValues,
removeUndefinedAndNullValues,
isDefinedAndNotNull,
getHashFromArray,
} = require('@rudderstack/integrations-lib');
const {
getIntegrationsObj,
validateEventName,
getValueFromMessage,
getFieldValueFromMessage,
getHashFromArray,
} = require('../../../../v0/util');
const {
EMAIL_FIELD_ID,
Expand All @@ -23,7 +23,6 @@ const {
MAX_BATCH_SIZE_BYTES,
groupedSuccessfulPayload,
} = require('./config');
const { SUPPORTED_EVENT_TYPE } = require('../the_trade_desk_real_time_conversions/config');
const { EventType } = require('../../../../constants');

const base64Sha = (str) => {
Expand Down Expand Up @@ -115,13 +114,23 @@ const deduceExternalIdValue = (message, emersysIdentifier, fieldMapping) => {
emersysIdentifier,
fieldMapping,
);
const externalIdValue = getValueFromMessage(message.context.traits, configuredPayloadProperty);
const externalIdValue = getValueFromMessage(message, [
`traits.${configuredPayloadProperty}`,
`context.traits.${configuredPayloadProperty}`,
]);

if (!isDefinedAndNotNull(deduceExternalIdValue)) {
throw new InstrumentationError(
`Could not find value for externalId required in ${message.type} call. Aborting.`,
);
}

return externalIdValue;
};

const buildGroupPayload = (message, destination) => {
const { emersysCustomIdentifier, defaultContactList, fieldMapping } = destination.Config;
const integrationObject = getIntegrationsObj(message, 'emersys');
const buildGroupPayload = (message, destConfig) => {
const { emersysCustomIdentifier, defaultContactList, fieldMapping } = destConfig;
const integrationObject = getIntegrationsObj(message, 'emarsys');
const emersysIdentifier =
integrationObject?.customIdentifierId || emersysCustomIdentifier || EMAIL_FIELD_ID;
const externalIdValue = deduceExternalIdValue(message, emersysIdentifier, fieldMapping);
Expand Down Expand Up @@ -155,6 +164,7 @@ const deduceEventId = (message, destConfig) => {
if (!eventId) {
throw new ConfigurationError(`${event} is not mapped to any Emersys external event. Aborting`);
}
return eventId;
};

const buildTrackPayload = (message, destination) => {
Expand Down Expand Up @@ -197,7 +207,7 @@ const deduceEndPoint = (finalPayload) => {
case EventType.IDENTIFY:
endPoint = 'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1';
break;
case SUPPORTED_EVENT_TYPE.GROUP:
case EventType.GROUP:
contactListId = destinationPayload.contactListId;
endPoint = `https://api.emarsys.net/api/v2/contactlist/${contactListId}/add`;
break;
Expand Down Expand Up @@ -274,15 +284,15 @@ const createGroupBatches = (events) => {
const grouped = lodash.groupBy(
events,
(item) =>
`${item.message.body.JSON.destinationPayload.payload.key_id}-${item.message.body.JSON.destinationPayload.contactListId}`,
`${item.message[0].body.JSON.destinationPayload.payload.key_id}-${item.message[0].body.JSON.destinationPayload.contactListId}`,
);

// eslint-disable-next-line @typescript-eslint/no-unused-vars
return Object.entries(grouped).flatMap(([key, group]) => {
const keyId = group[0].message.body.JSON.destinationPayload.payload.key_id;
const { contactListId } = group[0].message.body.JSON.destinationPayload;
const keyId = group[0].message[0].body.JSON.destinationPayload.payload.key_id;
const { contactListId } = group[0].message[0].body.JSON.destinationPayload;
const combinedExternalIds = group.reduce((acc, item) => {
acc.push(...item.message.body.JSON.destinationPayload.payload.external_ids);
acc.push(...item.message[0].body.JSON.destinationPayload.payload.external_ids);
return acc;
}, []);

Expand All @@ -299,11 +309,13 @@ const createGroupBatches = (events) => {
});
};

const createTrackBatches = (events) => ({
endpoint: events[0].message.endPoint,
payload: events[0].message.body.JSON.destinationPayload,
metadata: events[0].metadata,
});
const createTrackBatches = (events) => [
{
endpoint: events[0].message[0].endpoint,
payload: events[0].message[0].body.JSON.destinationPayload.payload,
metadata: [events[0].metadata],
},
];
const formatIdentifyPayloadsWithEndpoint = (combinedPayloads, endpointUrl = '') =>
combinedPayloads.map((singleCombinedPayload) => ({
endpoint: endpointUrl,
Expand Down Expand Up @@ -334,76 +346,149 @@ const buildBatchedRequest = (batches, method, constants, batchedStatus = true) =
destination: constants.destination,
}));

const batchResponseBuilder = (successfulEvents) => {
const finaloutput = [];
let batchesOfIdentifyEvents;
if (successfulEvents.length === 0) {
return [];
}
const constants = {
// const batchResponseBuilder = (successfulEvents) => {
// const finaloutput = [];
// let batchesOfIdentifyEvents;
// if (successfulEvents.length === 0) {
// return [];
// }
// const constants = {
// version: successfulEvents[0].message[0].version,
// type: successfulEvents[0].message[0].type,
// headers: successfulEvents[0].message[0].headers,
// destination: successfulEvents[0].destination,
// };

// const typedEventGroups = lodash.groupBy(
// successfulEvents,
// (event) => event.message[0].body.JSON.eventType,
// );
// Object.keys(typedEventGroups).forEach((eachEventGroup) => {
// switch (eachEventGroup) {
// case EventType.IDENTIFY:
// batchesOfIdentifyEvents = createIdentifyBatches(typedEventGroups[eachEventGroup]);
// groupedSuccessfulPayload.identify.batches = formatIdentifyPayloadsWithEndpoint(
// batchesOfIdentifyEvents,
// 'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1',
// );
// break;
// case EventType.GROUP:
// groupedSuccessfulPayload.group.batches = createGroupBatches(
// typedEventGroups[eachEventGroup],
// );
// break;
// case EventType.TRACK:
// groupedSuccessfulPayload.track.batches = createTrackBatches(
// typedEventGroups[eachEventGroup],
// );
// break;
// default:
// break;
// }
// return groupedSuccessfulPayload;
// });
// // Process each identify batch
// if (groupedSuccessfulPayload.identify) {
// const identifyBatches = buildBatchedRequest(
// groupedSuccessfulPayload.identify.batches,
// groupedSuccessfulPayload.identify.method,
// constants,
// );
// finaloutput.push(...identifyBatches);
// }

// // Process each group batch
// if (groupedSuccessfulPayload.group) {
// const groupBatches = buildBatchedRequest(
// groupedSuccessfulPayload.group.batches,
// groupedSuccessfulPayload.group.method,
// constants,
// );
// finaloutput.push(...groupBatches);
// }

// if (groupedSuccessfulPayload.track) {
// const trackBatches = buildBatchedRequest(
// groupedSuccessfulPayload.track.batches,
// groupedSuccessfulPayload.track.method,
// constants,
// false,
// );
// finaloutput.push(...trackBatches);
// }
// return finaloutput;
// };

// Helper to initialize the constants used across batch processing
function initializeConstants(successfulEvents) {
if (successfulEvents.length === 0) return null;
return {
version: successfulEvents[0].message[0].version,
type: successfulEvents[0].message[0].type,
headers: successfulEvents[0].message[0].headers,
destination: successfulEvents[0].destination,
};
}

// Helper to append requests based on batched events and constants
function appendRequestsToOutput(groupPayload, output, constants, batched = true) {
if (groupPayload.batches) {
const requests = buildBatchedRequest(
groupPayload.batches,
groupPayload.method,
constants,
batched,
);
output.push(...requests);
}
}

const typedEventGroups = lodash.groupBy(
successfulEvents,
(event) => event.message[0].body.JSON.eventType,
);
Object.keys(typedEventGroups).forEach((eachEventGroup) => {
switch (eachEventGroup) {
// Process batches based on event types
function processEventBatches(typedEventGroups, constants) {
let batchesOfIdentifyEvents;
const finalOutput = [];

// Process each event group based on type
Object.keys(typedEventGroups).forEach((eventType) => {
switch (eventType) {
case EventType.IDENTIFY:
batchesOfIdentifyEvents = createIdentifyBatches(typedEventGroups[eachEventGroup]);
batchesOfIdentifyEvents = createIdentifyBatches(typedEventGroups[eventType]);
groupedSuccessfulPayload.identify.batches = formatIdentifyPayloadsWithEndpoint(
batchesOfIdentifyEvents,
'https://api.emarsys.net/api/v2/contact/?create_if_not_exists=1',
);
break;
case EventType.GROUP:
groupedSuccessfulPayload.group.batches = createGroupBatches(eachEventGroup);
groupedSuccessfulPayload.group.batches = createGroupBatches(typedEventGroups[eventType]);
break;
case EventType.TRACK:
groupedSuccessfulPayload.track.batches = createTrackBatches(eachEventGroup);
groupedSuccessfulPayload.track.batches = createTrackBatches(typedEventGroups[eventType]);
break;
default:
break;
}
return groupedSuccessfulPayload;
});
console.log('groupedSuccessfulPayload', JSON.stringify(groupedSuccessfulPayload));
// Process each identify batch
if (groupedSuccessfulPayload.identify) {
const identifyBatches = buildBatchedRequest(
groupedSuccessfulPayload.identify.batches,
groupedSuccessfulPayload.identify.method,
constants,
);
finaloutput.push(...identifyBatches);
}

// Process each group batch
if (groupedSuccessfulPayload.group) {
const groupBatches = buildBatchedRequest(
groupedSuccessfulPayload.group.batches,
groupedSuccessfulPayload.group.method,
constants,
);
finaloutput.push(...groupBatches);
}
// Convert batches into requests for each event type and push to final output
appendRequestsToOutput(groupedSuccessfulPayload.identify, finalOutput, constants);
appendRequestsToOutput(groupedSuccessfulPayload.group, finalOutput, constants);
appendRequestsToOutput(groupedSuccessfulPayload.track, finalOutput, constants, false);

if (groupedSuccessfulPayload.track) {
const trackBatches = buildBatchedRequest(
groupedSuccessfulPayload.track.batches,
groupedSuccessfulPayload.track.method,
constants,
false,
);
finaloutput.push(...trackBatches);
}
console.log('FINAL', JSON.stringify(finaloutput));
return finaloutput;
};
return finalOutput;
}

// Entry function to create batches from successful events
function batchResponseBuilder(successfulEvents) {
const constants = initializeConstants(successfulEvents);
if (!constants) return [];

const typedEventGroups = lodash.groupBy(
successfulEvents,
(event) => event.message[0].body.JSON.eventType,
);

return processEventBatches(typedEventGroups, constants);
}

module.exports = {
buildIdentifyPayload,
Expand Down
Loading

0 comments on commit 54ed0b2

Please sign in to comment.