Skip to content

Commit

Permalink
fix: klaviyo jobs order (#3686)
Browse files Browse the repository at this point in the history
* fix: klaviyo jobs order

* chore: update src/v0/destinations/klaviyo/transform.js
  • Loading branch information
anantjain45823 authored Aug 30, 2024
1 parent 494df08 commit 26926c4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 27 deletions.
30 changes: 25 additions & 5 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
eventNameMapping,
jsonNameMapping,
} = require('./config');
const { processRouterDestV2, processV2 } = require('./transformV2');
const { processRouter: processRouterV2, processV2 } = require('./transformV2');
const {
createCustomerProperties,
subscribeUserToList,
Expand All @@ -35,6 +35,7 @@ const {
adduserIdFromExternalId,
getSuccessRespEvents,
handleRtTfSingleEventError,
groupEventsByType,
flattenJson,
isNewStatusCodesAccepted,
} = require('../../util');
Expand Down Expand Up @@ -333,11 +334,11 @@ const getEventChunks = (event, subscribeRespList, nonSubscribeRespList) => {
}
};

const processRouterDest = async (inputs, reqMetadata) => {
const processRouter = async (inputs, reqMetadata) => {
const { destination } = inputs[0];
// This is used to switch to latest API version
if (destination.Config?.apiVersion === 'v2') {
return processRouterDestV2(inputs, reqMetadata);
return processRouterV2(inputs, reqMetadata);
}
let batchResponseList = [];
const batchErrorRespList = [];
Expand Down Expand Up @@ -403,7 +404,26 @@ const processRouterDest = async (inputs, reqMetadata) => {

batchResponseList = [...batchedSubscribeResponseList, ...nonSubscribeSuccessList];

return [...batchResponseList, ...batchErrorRespList];
return { successEvents: batchResponseList, errorEvents: batchErrorRespList };
};
const processRouterDest = async (inputs, reqMetadata) => {
/**
We are doing this to maintain the order of events not only fo transformation but for delivery as well
Job Id: 1 2 3 4 5 6
Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3']
Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']]
Output after transformation: [1, [2,4], [3,5,6]]
*/
const inputsGroupedByType = groupEventsByType(inputs);
const respList = [];
const errList = [];
await Promise.all(
inputsGroupedByType.map(async (typedEventList) => {
const { successEvents, errorEvents } = await processRouter(typedEventList, reqMetadata);
respList.push(...successEvents);
errList.push(...errorEvents);
}),
);
return [...respList, ...errList];
};

module.exports = { process, processRouterDest };
22 changes: 1 addition & 21 deletions src/v0/destinations/klaviyo/transformV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const {
handleRtTfSingleEventError,
addExternalIdToTraits,
adduserIdFromExternalId,
groupEventsByType,
flattenJson,
isDefinedAndNotNull,
} = require('../../util');
Expand Down Expand Up @@ -242,23 +241,4 @@ const processRouter = (inputs, reqMetadata) => {
return { successEvents: batchResponseList, errorEvents: batchErrorRespList };
};

const processRouterDestV2 = (inputs, reqMetadata) => {
/**
We are doing this to maintain the order of events not only fo transformation but for delivery as well
Job Id: 1 2 3 4 5 6
Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3']
Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']]
Output after transformation: [1, [2,4], [3,5,6]]
*/
const inputsGroupedByType = groupEventsByType(inputs);
const respList = [];
const errList = [];
inputsGroupedByType.forEach((typedEventList) => {
const { successEvents, errorEvents } = processRouter(typedEventList, reqMetadata);
respList.push(...successEvents);
errList.push(...errorEvents);
});
return [...respList, ...errList];
};

module.exports = { processV2, processRouterDestV2 };
module.exports = { processV2, processRouter };
39 changes: 38 additions & 1 deletion test/integrations/destinations/klaviyo/router/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,43 @@ export const data: RouterTestData[] = [
list_id: 'XUepkK',
subscriptions: [
{ email: '[email protected]', phone_number: '+12 345 678 900' },
],
},
},
},
JSON_ARRAY: {},
XML: {},
FORM: {},
},
files: {},
},
],
metadata: [generateMetadata(3)],
batched: true,
statusCode: 200,
destination,
},
{
batchedRequest: [
{
version: '1',
type: 'REST',
method: 'POST',
endpoint: 'https://a.klaviyo.com/api/profile-subscription-bulk-create-jobs',
headers: {
Authorization: 'Klaviyo-API-Key dummyPrivateApiKey',
'Content-Type': 'application/json',
Accept: 'application/json',
revision: '2023-02-22',
},
params: {},
body: {
JSON: {
data: {
type: 'profile-subscription-bulk-create-job',
attributes: {
list_id: 'XUepkK',
subscriptions: [
{
email: '[email protected]',
phone_number: '+12 345 578 900',
Expand Down Expand Up @@ -120,7 +157,7 @@ export const data: RouterTestData[] = [
files: {},
},
],
metadata: [generateMetadata(3), generateMetadata(2)],
metadata: [generateMetadata(2)],
batched: true,
statusCode: 200,
destination,
Expand Down

0 comments on commit 26926c4

Please sign in to comment.