Skip to content

Commit

Permalink
feat: introduce job ordering in klaviyo
Browse files Browse the repository at this point in the history
  • Loading branch information
anantjain45823 committed Jul 23, 2024
1 parent 09e06ce commit e067fff
Show file tree
Hide file tree
Showing 4 changed files with 574 additions and 324 deletions.
28 changes: 24 additions & 4 deletions src/v0/destinations/klaviyo/transformV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
handleRtTfSingleEventError,
addExternalIdToTraits,
adduserIdFromExternalId,
groupEventsByType,
flattenJson,
} = require('../../util');

Expand Down Expand Up @@ -174,7 +175,7 @@ const getEventChunks = (input, subscribeRespList, profileRespList, eventRespList
}
};

const processRouterDestV2 = (inputs, reqMetadata) => {
const processRouter = (inputs, reqMetadata) => {
const batchResponseList = [];
const batchErrorRespList = [];
const subscribeRespList = [];
Expand Down Expand Up @@ -208,12 +209,31 @@ const processRouterDestV2 = (inputs, reqMetadata) => {
profileRespList,
destination,
);
const { anonymousTracking, identifiedTracking } = getTrackRequests(eventRespList, destination);
const trackRespList = getTrackRequests(eventRespList, destination);

// We are doing to maintain event ordering basically once a user is identified klaviyo does not allow user tracking based upon anonymous_id only
batchResponseList.push(...anonymousTracking, ...batchedResponseList, ...identifiedTracking);
batchResponseList.push(...trackRespList, ...batchedResponseList);

return { successEvents: batchResponseList, errorEvents: batchErrorRespList };
};

return [...batchResponseList, ...batchErrorRespList];
const processRouterDestV2 = (inputs, reqMetadata) => {
/**
We are doing this to maintain the order of events not only fo transformation but for delivery as well
Job Id: 1 2 3 4 5 6
Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3']
Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']]
Output after transformation: [1, [2,4], [3,5,6]]
*/
const inputsGroupedByType = groupEventsByType(inputs);
const respList = [];
const errList = [];
inputsGroupedByType.forEach((typedEventList) => {
const { successEvents, errorEvents } = processRouter(typedEventList, reqMetadata);
respList.push(...successEvents);
errList.push(...errorEvents);
});
return [...respList, ...errList];
};

module.exports = { processV2, processRouterDestV2 };
30 changes: 11 additions & 19 deletions src/v0/destinations/klaviyo/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -544,25 +544,17 @@ const buildSubscriptionRequest = (subscription, destination) => {

const getTrackRequests = (eventRespList, destination) => {
// building and pushing all the event requests
const anonymousTracking = [];
const identifiedTracking = [];
eventRespList.forEach((resp) => {
const { payload, metadata } = resp;
const { attributes: profileAttributes } = payload.data.attributes.profile.data;
// eslint-disable-next-line @typescript-eslint/naming-convention
const { email, phone_number, external_id } = profileAttributes;
const request = getSuccessRespEvents(
buildRequest(payload, destination, CONFIG_CATEGORIES.TRACKV2),
[metadata],
destination,
);
if (email || phone_number || external_id) {
identifiedTracking.push(request);
} else {
anonymousTracking.push(request);
}
});
return { anonymousTracking, identifiedTracking };
const respList = [];
eventRespList.forEach((resp) =>
respList.push(
getSuccessRespEvents(
buildRequest(resp.payload, destination, CONFIG_CATEGORIES.TRACKV2),
[resp.metadata],
destination,
),
),
);
return respList;
};

/**
Expand Down
Loading

0 comments on commit e067fff

Please sign in to comment.