Skip to content

Commit

Permalink
Merge branch 'develop' into GT/add-identify-source
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavowt authored Aug 19, 2024
2 parents 8abc579 + 474f2bd commit e55395c
Show file tree
Hide file tree
Showing 8 changed files with 1,143 additions and 76 deletions.
91 changes: 64 additions & 27 deletions src/v0/destinations/klaviyo/batchUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ const groupSubscribeResponsesUsingListIdV2 = (subscribeResponseList) => {
/**
* This function takes susbscription as input and batches them into a single request body
* @param {subscription}
* subscription= {listId, subscriptionProfileList}
* subscription= {listId, subscriptionProfileList, operation}
* subscription.operation could be either subscribe or unsubscribe
*/
const generateBatchedSubscriptionRequest = (subscription, destination) => {
const subscriptionPayloadResponse = defaultRequestConfig();
// fetching listId from first event as listId is same for all the events
const profiles = []; // list of profiles to be subscribed
const { listId, subscriptionProfileList } = subscription;
const { listId, subscriptionProfileList, operation } = subscription;
subscriptionProfileList.forEach((profileList) => profiles.push(...profileList));
subscriptionPayloadResponse.body.JSON = getSubscriptionPayload(listId, profiles);
subscriptionPayloadResponse.endpoint = `${BASE_ENDPOINT}/api/profile-subscription-bulk-create-jobs`;
subscriptionPayloadResponse.body.JSON = getSubscriptionPayload(listId, profiles, operation);
subscriptionPayloadResponse.endpoint = `${BASE_ENDPOINT}/api/${operation === 'subscribe' ? 'profile-subscription-bulk-create-jobs' : 'profile-subscription-bulk-delete-jobs'}`;
subscriptionPayloadResponse.headers = {
Authorization: `Klaviyo-API-Key ${destination.Config.privateApiKey}`,
'Content-Type': JSON_MIME_TYPE,
Expand Down Expand Up @@ -90,12 +91,12 @@ const populateArrWithRespectiveProfileData = (
* ex:
* profileSubscriptionAndMetadataArr = [
{
subscription: { subscriptionProfileList, listId1 },
subscription: { subscriptionProfileList, listId1, operation },
metadataList1,
profiles: [respectiveProfiles for above metadata]
},
{
subscription: { subscriptionProfile List With No Profiles, listId2 },
subscription: { subscriptionProfile List With No Profiles, listId2, operation },
metadataList2,
},
{
Expand All @@ -107,18 +108,15 @@ const populateArrWithRespectiveProfileData = (
* @param {*} destination
* @returns
*/
const buildRequestsForProfileSubscriptionAndMetadataArr = (
profileSubscriptionAndMetadataArr,
destination,
) => {
const buildProfileAndSubscriptionRequests = (profileSubscriptionAndMetadataArr, destination) => {
const finalResponseList = [];
profileSubscriptionAndMetadataArr.forEach((profileSubscriptionData) => {
const batchedRequest = [];
// we are keeping profiles request prior to subscription ones as first profile creation and then subscription should happen
if (profileSubscriptionData.profiles?.length > 0) {
batchedRequest.push(...getProfileRequests(profileSubscriptionData.profiles, destination));
}

// following condition ensures if no subscriptions are present we won't build subscription payload
if (profileSubscriptionData.subscription?.subscriptionProfileList?.length > 0) {
batchedRequest.push(
generateBatchedSubscriptionRequest(profileSubscriptionData.subscription, destination),
Expand All @@ -132,46 +130,88 @@ const buildRequestsForProfileSubscriptionAndMetadataArr = (
return finalResponseList;
};

const batchRequestV2 = (subscribeRespList, profileRespList, destination) => {
const subscribeEventGroups = groupSubscribeResponsesUsingListIdV2(subscribeRespList);
let profileSubscriptionAndMetadataArr = [];
const metaDataIndexMap = new Map();
/**
* This function updates the profileSubscriptionAndMetadataArr array with the subscription requests
* @param {*} subscribeStatusList
* @param {*} profilesList
* @param {*} operation
* @param {*} profileSubscriptionAndMetadataArr
* @param {*} metaDataIndexMap
*/
const updateArrWithSubscriptions = (
subscribeStatusList,
profilesList,
operation,
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
) => {
const subscribeEventGroups = groupSubscribeResponsesUsingListIdV2(subscribeStatusList);

Object.keys(subscribeEventGroups).forEach((listId) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(subscribeEventGroups[listId], MAX_BATCH_SIZE);
eventChunks.forEach((chunk, index) => {
eventChunks.forEach((chunk) => {
// get subscriptionProfiles for the chunk
const subscriptionProfileList = chunk.map((event) => event.payload?.profile);
// get metadata for this chunk
const metadataList = chunk.map((event) => event.metadata);
// get list of jobIds from the above metdadata
const jobIdList = metadataList.map((metadata) => metadata.jobId);
// using length as index
const index = profileSubscriptionAndMetadataArr.length;
// push the jobId: index to metadataIndex mapping which let us know the metadata respective payload index position in batched request
jobIdList.forEach((jobId) => {
metaDataIndexMap.set(jobId, index);
});
profileSubscriptionAndMetadataArr.push({
subscription: { subscriptionProfileList, listId },
subscription: { subscriptionProfileList, listId, operation },
metadataList,
profiles: [],
});
});
});
profileSubscriptionAndMetadataArr = populateArrWithRespectiveProfileData(
};

/**
* This function performs batching for the subscription and unsubscription requests and attaches respective profile request as well if present
* @param {*} subscribeList
* @param {*} unsubscribeList
* @param {*} profilesList
* @param {*} destination
* @returns
*/
const batchRequestV2 = (subscribeList, unsubscribeList, profilesList, destination) => {
const profileSubscriptionAndMetadataArr = [];
const metaDataIndexMap = new Map();
updateArrWithSubscriptions(
subscribeList,
profilesList,
'subscribe',
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
profileRespList,
);
updateArrWithSubscriptions(
unsubscribeList,
profilesList,
'unsubscribe',
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
);
const subscriptionsAndProfileArr = populateArrWithRespectiveProfileData(
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
profilesList,
);
/* Till this point I have a profileSubscriptionAndMetadataArr
containing the the events in one object for which batching has to happen in following format
[
{
subscription: { subscriptionProfileList, listId1 },
subscription: { subscriptionProfileList, listId1, operation },
metadataList1,
profiles: [respectiveProfiles for above metadata]
},
{
subscription: { subscriptionProfile List With No Profiles, listId2 },
subscription: { subscriptionProfile List With No Profiles, listId2, operation },
metadataList2,
},
{
Expand All @@ -180,14 +220,11 @@ const batchRequestV2 = (subscribeRespList, profileRespList, destination) => {
}
]
*/
return buildRequestsForProfileSubscriptionAndMetadataArr(
profileSubscriptionAndMetadataArr,
destination,
);
return buildProfileAndSubscriptionRequests(subscriptionsAndProfileArr, destination);
/* for identify calls with batching batched with identify with no batching
we will sonctruct O/P as:
we will construct O/P as:
[
[2 calls for identifywith batching],
[2 calls for identify with batching],
[1 call identify calls with batching]
]
*/
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/klaviyo/batchUtil.test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { OperatorType } = require('@rudderstack/json-template-engine');
const {
groupSubscribeResponsesUsingListIdV2,
populateArrWithRespectiveProfileData,
Expand Down Expand Up @@ -94,6 +95,7 @@ describe('generateBatchedSubscriptionRequest', () => {
const subscription = {
listId: 'test-list-id',
subscriptionProfileList: [[{ id: 'profile1' }, { id: 'profile2' }], [{ id: 'profile3' }]],
operation: 'subscribe',
};
const destination = {
Config: {
Expand Down Expand Up @@ -144,6 +146,7 @@ describe('generateBatchedSubscriptionRequest', () => {
const subscription = {
listId: 'test-list-id',
subscriptionProfileList: [],
operation: 'subscribe',
};
const destination = {
Config: {
Expand Down
5 changes: 5 additions & 0 deletions src/v0/destinations/klaviyo/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ const CONFIG_CATEGORIES = {
VIEWED_PRODUCT: { name: 'ViewedProduct' },
ADDED_TO_CART: { name: 'AddedToCart' },
ITEMS: { name: 'Items' },
SUBSCRIBE: { name: 'KlaviyoProfileV2', apiUrl: '/api/profile-subscription-bulk-create-jobs' },
UNSUBSCRIBE: {
name: 'KlaviyoProfileV2',
apiUrl: '/api/profile-subscription-bulk-delete-jobs',
},
};
const ecomExclusionKeys = [
'name',
Expand Down
62 changes: 46 additions & 16 deletions src/v0/destinations/klaviyo/transformV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const { EventType, MappedToDestinationKey } = require('../../../constants');
const { CONFIG_CATEGORIES, MAPPING_CONFIG } = require('./config');
const {
constructProfile,
subscribeUserToListV2,
subscribeOrUnsubscribeUserToListV2,
buildRequest,
buildSubscriptionRequest,
buildSubscriptionOrUnsubscriptionPayload,
getTrackRequests,
fetchTransformedEvents,
addSubscribeFlagToTraits,
Expand All @@ -24,6 +24,7 @@ const {
adduserIdFromExternalId,
groupEventsByType,
flattenJson,
isDefinedAndNotNull,
} = require('../../util');

/**
Expand All @@ -49,9 +50,17 @@ const identifyRequestHandler = (message, category, destination) => {
}
const payload = removeUndefinedAndNullValues(constructProfile(message, destination, true));
const response = { profile: payload };
// check if user wants to subscribe profile or not and listId is present or not
if (traitsInfo?.properties?.subscribe && (traitsInfo.properties?.listId || listId)) {
response.subscription = subscribeUserToListV2(message, traitsInfo, destination);
// check if user wants to subscribe/unsubscribe profile or do nothing and listId is present or not
if (
isDefinedAndNotNull(traitsInfo?.properties?.subscribe) &&
(traitsInfo.properties?.listId || listId)
) {
response.subscription = subscribeOrUnsubscribeUserToListV2(
message,
traitsInfo,
destination,
traitsInfo.properties.subscribe ? 'subscribe' : 'unsubscribe',
);
}
return response;
};
Expand Down Expand Up @@ -93,7 +102,7 @@ const trackOrScreenRequestHandler = (message, category, destination) => {
};

/**
* Main handlerfunc for group request add/subscribe users to the list based on property sent
* Main handlerfunc for group request add/subscribe to or remove/delete users to the list based on property sent
* DOCS:https://developers.klaviyo.com/en/reference/subscribe_profiles
* @param {*} message
* @param {*} category
Expand All @@ -105,11 +114,17 @@ const groupRequestHandler = (message, category, destination) => {
throw new InstrumentationError('groupId is a required field for group events');
}
const traitsInfo = getFieldValueFromMessage(message, 'traits');
if (!traitsInfo?.subscribe) {
throw new InstrumentationError('Subscribe flag should be true for group call');
if (!isDefinedAndNotNull(traitsInfo?.subscribe)) {
throw new InstrumentationError('Subscribe flag should be included in group call');
}
// throwing error for subscribe flag
return { subscription: subscribeUserToListV2(message, traitsInfo, destination) };
return {
subscription: subscribeOrUnsubscribeUserToListV2(
message,
traitsInfo,
destination,
traitsInfo.subscribe ? 'subscribe' : 'unsubscribe',
),
};
};

const processEvent = (event) => {
Expand Down Expand Up @@ -152,9 +167,7 @@ const processV2 = (event) => {
respList.push(buildRequest(response.profile, destination, CONFIG_CATEGORIES.IDENTIFYV2));
}
if (response.subscription) {
respList.push(
buildSubscriptionRequest(response.subscription, destination, CONFIG_CATEGORIES.TRACKV2),
);
respList.push(buildSubscriptionOrUnsubscriptionPayload(response.subscription, destination));
}
if (response.event) {
respList.push(buildRequest(response.event, destination, CONFIG_CATEGORIES.TRACKV2));
Expand All @@ -163,9 +176,19 @@ const processV2 = (event) => {
};

// This function separates subscribe, proifle and event responses from process () and other responses in chunks
const getEventChunks = (input, subscribeRespList, profileRespList, eventRespList) => {
const getEventChunks = (
input,
subscribeRespList,
profileRespList,
eventRespList,
unsubscriptionList,
) => {
if (input.payload.subscription) {
subscribeRespList.push({ payload: input.payload.subscription, metadata: input.metadata });
if (input.payload.subscription.operation === 'subscribe') {
subscribeRespList.push({ payload: input.payload.subscription, metadata: input.metadata });
} else {
unsubscriptionList.push({ payload: input.payload.subscription, metadata: input.metadata });
}
}
if (input.payload.profile) {
profileRespList.push({ payload: input.payload.profile, metadata: input.metadata });
Expand All @@ -179,6 +202,7 @@ const processRouter = (inputs, reqMetadata) => {
const batchResponseList = [];
const batchErrorRespList = [];
const subscribeRespList = [];
const unsubscriptionList = [];
const profileRespList = [];
const eventRespList = [];
const { destination } = inputs[0];
Expand All @@ -197,14 +221,20 @@ const processRouter = (inputs, reqMetadata) => {
subscribeRespList,
profileRespList,
eventRespList,
unsubscriptionList,
);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
});
const batchedResponseList = batchRequestV2(subscribeRespList, profileRespList, destination);
const batchedResponseList = batchRequestV2(
subscribeRespList,
unsubscriptionList,
profileRespList,
destination,
);
const trackRespList = getTrackRequests(eventRespList, destination);

batchResponseList.push(...trackRespList, ...batchedResponseList);
Expand Down
Loading

0 comments on commit e55395c

Please sign in to comment.