Skip to content

Commit

Permalink
fix: batch logic to be more clear
Browse files Browse the repository at this point in the history
  • Loading branch information
anantjain45823 committed Jul 25, 2024
1 parent e067fff commit c9a4837
Show file tree
Hide file tree
Showing 6 changed files with 947 additions and 780 deletions.
284 changes: 150 additions & 134 deletions src/v0/destinations/klaviyo/batchUtil.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
const lodash = require('lodash');
const { defaultBatchRequestConfig, getSuccessRespEvents } = require('../../util');
const {
defaultBatchRequestConfig,
getSuccessRespEvents,
isDefinedAndNotNull,
} = require('../../util');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { BASE_ENDPOINT, CONFIG_CATEGORIES, MAX_BATCH_SIZE, revision } = require('./config');
const { buildRequest, getSubscriptionPayload } = require('./util');

/**
* This function groups the subscription responses on list id
* @param {*} subscribeResponseList
Expand All @@ -22,165 +27,176 @@ const groupSubscribeResponsesUsingListIdV2 = (subscribeResponseList) => {
};

/**
* This function returns the list of profileReq which do not metadata common with subcriptionMetadataArray
* @param {*} profileReq
* @param {*} subscriptionMetadataArray
* @returns
*/
const getRemainingProfiles = (profileReq, subscriptionMetadataArray) => {
const subscriptionListJobIds = subscriptionMetadataArray.map((metadata) => metadata.jobId);
return profileReq.filter((profile) => !subscriptionListJobIds.includes(profile.metadata.jobId));
};

/**
* This function builds all the profile requests whose metadata is not there in subscriptionMetadataArray
* @param {*} profileRespList
* @param {*} subscriptionMetadataArray
* @param {*} destination
* @returns
*/
const getProfiles = (profileRespList, subscriptionMetadataArray, destination) => {
const profiles = [];
const remainingProfileReq = getRemainingProfiles(profileRespList, subscriptionMetadataArray);
remainingProfileReq.forEach((input) => {
profiles.push(
getSuccessRespEvents(
buildRequest(input.payload, destination, CONFIG_CATEGORIES.IDENTIFYV2),
[input.metadata],
destination,
),
);
});
return profiles;
};

/**
* This function takes susbscriptions as input and batches them into a single request body
* @param {events}
* events= [
* { payload: {id:'list_id', profile: {}}, metadata:{} },
* { payload: {id:'list_id', profile: {}}, metadata:{} }
* ]
* This function takes susbscription as input and batches them into a single request body
* @param {subscription}
* subscription= {listId, subscriptionProfileList}
*/
const generateBatchedSubscriptionRequest = (events, destination) => {
const generateBatchedSubscriptionRequest = (subscription, destination) => {
const batchEventResponse = defaultBatchRequestConfig();
const metadata = [];
// if( !isDefinedAndNotNull(subscription) )
// fetching listId from first event as listId is same for all the events
const listId = events[0].payload?.listId;
const profiles = []; // list of profiles to be subscribes
// Batch profiles into dest batch structure
events.forEach((ev) => {
profiles.push(...ev.payload.profile);
metadata.push(ev.metadata);
});

batchEventResponse.batchedRequest = Object.values(batchEventResponse);
batchEventResponse.batchedRequest[0].body.JSON = getSubscriptionPayload(listId, profiles);

batchEventResponse.batchedRequest[0].endpoint = `${BASE_ENDPOINT}/api/profile-subscription-bulk-create-jobs`;

batchEventResponse.batchedRequest[0].headers = {
const profiles = []; // list of profiles to be subscribed
const { listId, subscriptionProfileList } = subscription;
subscriptionProfileList.forEach((profileList) => profiles.push(...profileList));
batchEventResponse.batchedRequest.body.JSON = getSubscriptionPayload(listId, profiles);
batchEventResponse.batchedRequest.endpoint = `${BASE_ENDPOINT}/api/profile-subscription-bulk-create-jobs`;
batchEventResponse.batchedRequest.headers = {
Authorization: `Klaviyo-API-Key ${destination.Config.privateApiKey}`,
'Content-Type': JSON_MIME_TYPE,
Accept: JSON_MIME_TYPE,
revision,
};

return {
...batchEventResponse,
metadata,
destination,
};
return batchEventResponse.batchedRequest;
};

/**
* This function fetches the profileRequests with metadata present in metadata array build a request for them
* and add these requests batchEvent Response
* @param {*} profileReq array of profile requests
* @param {*} metadataArray array of metadata
* @param {*} batchEventResponse
* Example: /**
*
* @param {*} subscribeEventGroups
* @param {*} identifyResponseList
* @returns
* Example:
* profileReq = [
* { payload: {}, metadata:{} },
* { payload: {}, metadata:{} }
* ]
* This function updates batchedRequest with profile requests
* @param {*} profiles
* @param {*} batchedRequest
* @param {*} destination
*/
const updateBatchEventResponseWithProfileRequests = (
profileReqArr,
subscriptionMetadataArray,
batchEventResponse,
) => {
const subscriptionListJobIds = subscriptionMetadataArray.map((metadata) => metadata.jobId);
const updateBatchEventResponseWithProfileRequests = (profiles, batchedRequest, destination) => {
const profilesRequests = [];
profileReqArr.forEach((profile) => {
if (subscriptionListJobIds.includes(profile.metadata.jobId)) {
profilesRequests.push(
buildRequest(profile.payload, batchEventResponse.destination, CONFIG_CATEGORIES.IDENTIFYV2),
);
}
profiles.forEach((profile) => {
profilesRequests.push(buildRequest(profile, destination, CONFIG_CATEGORIES.IDENTIFYV2));
});
// we are keeping profiles request prior to subscription ones
batchEventResponse.batchedRequest.unshift(...profilesRequests);
// we are keeping profiles request prior to subscription ones as first profile creation and then subscription should happen
batchedRequest.unshift(...profilesRequests);
};

const processSubscribeChunk = (chunk, destination, profileRespList) => {
const batchEventResponse = generateBatchedSubscriptionRequest(chunk, destination);
const { metadata: subscriptionMetadataArray } = batchEventResponse;
updateBatchEventResponseWithProfileRequests(
profileRespList,
subscriptionMetadataArray,
batchEventResponse,
);
return batchEventResponse;
/**
* this function populates profileSubscriptionAndMetadataArr with respective profiles based upon common metadata
* @param {*} profileSubscriptionAndMetadataArr
* @param {*} metadataIndexMap
* @param {*} profiles
* @returns updated profileSubscriptionAndMetadataArr obj
*/
const populateArrWithRespectiveProfileData = (
profileSubscriptionAndMetadataArr,
metadataIndexMap,
profiles,
) => {
const updatedPSMArr = profileSubscriptionAndMetadataArr;
profiles.forEach((profile) => {
const index = metadataIndexMap[profile.metadata.jobId];
if (isDefinedAndNotNull(index)) {
// using isDefinedAndNotNull as index can be 0
updatedPSMArr[index].profiles.push(profile.payload);
} else {
// in case there is no subscription for a given profile
updatedPSMArr.push({
profiles: [profile.payload],
metadataList: [profile.metadata],
});
}
});
return updatedPSMArr;
};

/**
* This function batches the requests. Alogorithm
* Batch events from Subscribe Resp List having same listId/groupId to be subscribed and have their metadata array
* For this metadata array get all profileRequests and add them prior to batched Subscribe Request in the same batched Request
* Make another batched request for the remaning profile requests and another for all the event requests
* @param {*} subscribeRespList
* @param {*} profileRespList
* @param {*} eventRespList
* subscribeRespList = [
* { payload: {id:'list_id', profile: {}}, metadata:{} },
* { payload: {id:'list_id', profile: {}}, metadata:{} }
* ]
* profileRespList = [
* { payload: {}, metadata:{} },
* { payload: {}, metadata:{} }
* ]
*
* This function generates the final output batched payload for each object in profileSubscriptionAndMetadataArr
* ex:
* profileSubscriptionAndMetadataArr = [
{
subscription: { subscriptionProfileList, listId1 },
metadataList1,
profiles: [respectiveProfiles for above metadata]
},
{
subscription: { subscriptionProfile List With No Profiles, listId2 },
metadataList2,
},
{
metadataList3,
profiles: [respectiveProfiles for above metadata with no subscription]
}
]
* @param {*} profileSubscriptionAndMetadataArr
* @param {*} destination
* @returns
*/
const batchSubscriptionRequestV2 = (subscribeRespList, profileRespList, destination) => {
const buildRequestsForProfileSubscriptionAndMetadataArr = (
profileSubscriptionAndMetadataArr,
destination,
) => {
const batchedResponseList = [];
const subscriptionMetadataArrayForAll = [];
profileSubscriptionAndMetadataArr.forEach((input) => {
const batchedRequest = [];
if (input.subscription) {
batchedRequest.push(generateBatchedSubscriptionRequest(input.subscription, destination));
}
updateBatchEventResponseWithProfileRequests(input.profiles, batchedRequest, destination);
batchedResponseList.push(
getSuccessRespEvents(batchedRequest, input.metadataList, destination, true),
);
});
return batchedResponseList;
};

const batchRequestV2 = (subscribeRespList, profileRespList, destination) => {
const subscribeEventGroups = groupSubscribeResponsesUsingListIdV2(subscribeRespList);
let profileSubscriptionAndMetadataArr = [];
const metadataIndexMap = {};
Object.keys(subscribeEventGroups).forEach((listId) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(subscribeEventGroups[listId], MAX_BATCH_SIZE);
const batchedResponse = [];
eventChunks.forEach((chunk) => {
// returns subscriptionMetadata and batchEventResponse
const { metadata: subscriptionMetadataArray, batchedRequest } = processSubscribeChunk(
chunk,
destination,
profileRespList,
);
subscriptionMetadataArrayForAll.push(...subscriptionMetadataArray);
batchedResponse.push(
getSuccessRespEvents(batchedRequest, subscriptionMetadataArray, destination, true),
);
eventChunks.forEach((chunk, index) => {
// get subscriptionProfiles for the chunk
const subscriptionProfileList = chunk.map((event) => event.payload?.profile);
// get metadata for this chunk
const metadataList = chunk.map((event) => event.metadata);
// get list of jobIds from the above metdadata
const jobIdList = metadataList.map((metadata) => metadata.jobId);
// push the jobId: index to metadataIndex mapping which let us know the metadata respective payload index position in batched request
jobIdList.forEach((jobId) => {
metadataIndexMap[jobId] = index;
});
profileSubscriptionAndMetadataArr.push({
subscription: { subscriptionProfileList, listId },
metadataList,
profiles: [],
});
});
batchedResponseList.push(...batchedResponse);
});
const profiles = getProfiles(profileRespList, subscriptionMetadataArrayForAll, destination);
profileSubscriptionAndMetadataArr = populateArrWithRespectiveProfileData(
profileSubscriptionAndMetadataArr,
metadataIndexMap,
profileRespList,
);
/* Till this point I have a profileSubscriptionAndMetadataArr
containing the the events in one object for which batching has to happen in following format
[
{
subscription: { subscriptionProfileList, listId1 },
metadataList1,
profiles: [respectiveProfiles for above metadata]
},
{
subscription: { subscriptionProfile List With No Profiles, listId2 },
metadataList2,
},
{
metadataList3,
profiles: [respectiveProfiles for above metadata with no subscription]
}
]
*/
return buildRequestsForProfileSubscriptionAndMetadataArr(
profileSubscriptionAndMetadataArr,
destination,
);
/* for identify calls with batching batched with identify with no batching
we will sonctruct O/P as:
[
[2 calls for identifywith batching],
[1 call identify calls with batching]
]
*/
};

return [...profiles, ...batchedResponseList];
module.exports = {
groupSubscribeResponsesUsingListIdV2,
populateArrWithRespectiveProfileData,
generateBatchedSubscriptionRequest,
batchRequestV2,
};
module.exports = { batchSubscriptionRequestV2, groupSubscribeResponsesUsingListIdV2 };
Loading

0 comments on commit c9a4837

Please sign in to comment.