From 26926c40fcbf4c146a37ac16c2cc7280e110a6e6 Mon Sep 17 00:00:00 2001 From: Anant Jain <62471433+anantjain45823@users.noreply.github.com> Date: Fri, 30 Aug 2024 14:22:55 +0530 Subject: [PATCH] fix: klaviyo jobs order (#3686) * fix: klaviyo jobs order * chore: update src/v0/destinations/klaviyo/transform.js --- src/v0/destinations/klaviyo/transform.js | 30 +++++++++++--- src/v0/destinations/klaviyo/transformV2.js | 22 +---------- .../destinations/klaviyo/router/data.ts | 39 ++++++++++++++++++- 3 files changed, 64 insertions(+), 27 deletions(-) diff --git a/src/v0/destinations/klaviyo/transform.js b/src/v0/destinations/klaviyo/transform.js index d5047a4220..d73dbca600 100644 --- a/src/v0/destinations/klaviyo/transform.js +++ b/src/v0/destinations/klaviyo/transform.js @@ -13,7 +13,7 @@ const { eventNameMapping, jsonNameMapping, } = require('./config'); -const { processRouterDestV2, processV2 } = require('./transformV2'); +const { processRouter: processRouterV2, processV2 } = require('./transformV2'); const { createCustomerProperties, subscribeUserToList, @@ -35,6 +35,7 @@ const { adduserIdFromExternalId, getSuccessRespEvents, handleRtTfSingleEventError, + groupEventsByType, flattenJson, isNewStatusCodesAccepted, } = require('../../util'); @@ -333,11 +334,11 @@ const getEventChunks = (event, subscribeRespList, nonSubscribeRespList) => { } }; -const processRouterDest = async (inputs, reqMetadata) => { +const processRouter = async (inputs, reqMetadata) => { const { destination } = inputs[0]; // This is used to switch to latest API version if (destination.Config?.apiVersion === 'v2') { - return processRouterDestV2(inputs, reqMetadata); + return processRouterV2(inputs, reqMetadata); } let batchResponseList = []; const batchErrorRespList = []; @@ -403,7 +404,26 @@ const processRouterDest = async (inputs, reqMetadata) => { batchResponseList = [...batchedSubscribeResponseList, ...nonSubscribeSuccessList]; - return [...batchResponseList, ...batchErrorRespList]; + return { successEvents: batchResponseList, errorEvents: batchErrorRespList }; +}; +const processRouterDest = async (inputs, reqMetadata) => { + /** + We are doing this to maintain the order of events not only fo transformation but for delivery as well + Job Id: 1 2 3 4 5 6 + Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3'] + Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']] + Output after transformation: [1, [2,4], [3,5,6]] + */ + const inputsGroupedByType = groupEventsByType(inputs); + const respList = []; + const errList = []; + await Promise.all( + inputsGroupedByType.map(async (typedEventList) => { + const { successEvents, errorEvents } = await processRouter(typedEventList, reqMetadata); + respList.push(...successEvents); + errList.push(...errorEvents); + }), + ); + return [...respList, ...errList]; }; - module.exports = { process, processRouterDest }; diff --git a/src/v0/destinations/klaviyo/transformV2.js b/src/v0/destinations/klaviyo/transformV2.js index 6d04cb8644..bcb028bcea 100644 --- a/src/v0/destinations/klaviyo/transformV2.js +++ b/src/v0/destinations/klaviyo/transformV2.js @@ -22,7 +22,6 @@ const { handleRtTfSingleEventError, addExternalIdToTraits, adduserIdFromExternalId, - groupEventsByType, flattenJson, isDefinedAndNotNull, } = require('../../util'); @@ -242,23 +241,4 @@ const processRouter = (inputs, reqMetadata) => { return { successEvents: batchResponseList, errorEvents: batchErrorRespList }; }; -const processRouterDestV2 = (inputs, reqMetadata) => { - /** - We are doing this to maintain the order of events not only fo transformation but for delivery as well - Job Id: 1 2 3 4 5 6 - Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3'] - Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']] - Output after transformation: [1, [2,4], [3,5,6]] - */ - const inputsGroupedByType = groupEventsByType(inputs); - const respList = []; - const errList = []; - inputsGroupedByType.forEach((typedEventList) => { - const { successEvents, errorEvents } = processRouter(typedEventList, reqMetadata); - respList.push(...successEvents); - errList.push(...errorEvents); - }); - return [...respList, ...errList]; -}; - -module.exports = { processV2, processRouterDestV2 }; +module.exports = { processV2, processRouter }; diff --git a/test/integrations/destinations/klaviyo/router/data.ts b/test/integrations/destinations/klaviyo/router/data.ts index 8e8c507f48..1b3f7f39ad 100644 --- a/test/integrations/destinations/klaviyo/router/data.ts +++ b/test/integrations/destinations/klaviyo/router/data.ts @@ -63,6 +63,43 @@ export const data: RouterTestData[] = [ list_id: 'XUepkK', subscriptions: [ { email: 'test@rudderstack.com', phone_number: '+12 345 678 900' }, + ], + }, + }, + }, + JSON_ARRAY: {}, + XML: {}, + FORM: {}, + }, + files: {}, + }, + ], + metadata: [generateMetadata(3)], + batched: true, + statusCode: 200, + destination, + }, + { + batchedRequest: [ + { + version: '1', + type: 'REST', + method: 'POST', + endpoint: 'https://a.klaviyo.com/api/profile-subscription-bulk-create-jobs', + headers: { + Authorization: 'Klaviyo-API-Key dummyPrivateApiKey', + 'Content-Type': 'application/json', + Accept: 'application/json', + revision: '2023-02-22', + }, + params: {}, + body: { + JSON: { + data: { + type: 'profile-subscription-bulk-create-job', + attributes: { + list_id: 'XUepkK', + subscriptions: [ { email: 'test@rudderstack.com', phone_number: '+12 345 578 900', @@ -120,7 +157,7 @@ export const data: RouterTestData[] = [ files: {}, }, ], - metadata: [generateMetadata(3), generateMetadata(2)], + metadata: [generateMetadata(2)], batched: true, statusCode: 200, destination,