Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: klaviyo onboard unsubscribe profile support #3646

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 64 additions & 27 deletions src/v0/destinations/klaviyo/batchUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ const groupSubscribeResponsesUsingListIdV2 = (subscribeResponseList) => {
/**
* This function takes susbscription as input and batches them into a single request body
* @param {subscription}
* subscription= {listId, subscriptionProfileList}
* subscription= {listId, subscriptionProfileList, operation}
* subscription.operation could be either subscribe or unsubscribe
*/
const generateBatchedSubscriptionRequest = (subscription, destination) => {
const subscriptionPayloadResponse = defaultRequestConfig();
// fetching listId from first event as listId is same for all the events
const profiles = []; // list of profiles to be subscribed
const { listId, subscriptionProfileList } = subscription;
const { listId, subscriptionProfileList, operation } = subscription;
subscriptionProfileList.forEach((profileList) => profiles.push(...profileList));
subscriptionPayloadResponse.body.JSON = getSubscriptionPayload(listId, profiles);
subscriptionPayloadResponse.endpoint = `${BASE_ENDPOINT}/api/profile-subscription-bulk-create-jobs`;
subscriptionPayloadResponse.body.JSON = getSubscriptionPayload(listId, profiles, operation);
subscriptionPayloadResponse.endpoint = `${BASE_ENDPOINT}/api/${operation === 'subscribe' ? 'profile-subscription-bulk-create-jobs' : 'profile-subscription-bulk-delete-jobs'}`;
subscriptionPayloadResponse.headers = {
Authorization: `Klaviyo-API-Key ${destination.Config.privateApiKey}`,
'Content-Type': JSON_MIME_TYPE,
Expand Down Expand Up @@ -90,12 +91,12 @@ const populateArrWithRespectiveProfileData = (
* ex:
* profileSubscriptionAndMetadataArr = [
{
subscription: { subscriptionProfileList, listId1 },
subscription: { subscriptionProfileList, listId1, operation },
metadataList1,
profiles: [respectiveProfiles for above metadata]
},
{
subscription: { subscriptionProfile List With No Profiles, listId2 },
subscription: { subscriptionProfile List With No Profiles, listId2, operation },
metadataList2,
},
{
Expand All @@ -107,18 +108,15 @@ const populateArrWithRespectiveProfileData = (
* @param {*} destination
* @returns
*/
const buildRequestsForProfileSubscriptionAndMetadataArr = (
profileSubscriptionAndMetadataArr,
destination,
) => {
const buildProfileAndSubscriptionRequests = (profileSubscriptionAndMetadataArr, destination) => {
const finalResponseList = [];
profileSubscriptionAndMetadataArr.forEach((profileSubscriptionData) => {
const batchedRequest = [];
// we are keeping profiles request prior to subscription ones as first profile creation and then subscription should happen
if (profileSubscriptionData.profiles?.length > 0) {
batchedRequest.push(...getProfileRequests(profileSubscriptionData.profiles, destination));
}

// following condition ensures if no subscriptions are present we won't build subscription payload
if (profileSubscriptionData.subscription?.subscriptionProfileList?.length > 0) {
batchedRequest.push(
generateBatchedSubscriptionRequest(profileSubscriptionData.subscription, destination),
Expand All @@ -132,46 +130,88 @@ const buildRequestsForProfileSubscriptionAndMetadataArr = (
return finalResponseList;
};

const batchRequestV2 = (subscribeRespList, profileRespList, destination) => {
const subscribeEventGroups = groupSubscribeResponsesUsingListIdV2(subscribeRespList);
let profileSubscriptionAndMetadataArr = [];
const metaDataIndexMap = new Map();
/**
* This function updates the profileSubscriptionAndMetadataArr array with the subscription requests
* @param {*} subscribeStatusList
* @param {*} profilesList
* @param {*} operation
* @param {*} profileSubscriptionAndMetadataArr
* @param {*} metaDataIndexMap
*/
const updateArrWithSubscriptions = (
subscribeStatusList,
profilesList,
operation,
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
) => {
const subscribeEventGroups = groupSubscribeResponsesUsingListIdV2(subscribeStatusList);

Object.keys(subscribeEventGroups).forEach((listId) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(subscribeEventGroups[listId], MAX_BATCH_SIZE);
eventChunks.forEach((chunk, index) => {
eventChunks.forEach((chunk) => {
// get subscriptionProfiles for the chunk
const subscriptionProfileList = chunk.map((event) => event.payload?.profile);
// get metadata for this chunk
const metadataList = chunk.map((event) => event.metadata);
// get list of jobIds from the above metdadata
const jobIdList = metadataList.map((metadata) => metadata.jobId);
// using length as index
const index = profileSubscriptionAndMetadataArr.length;
// push the jobId: index to metadataIndex mapping which let us know the metadata respective payload index position in batched request
jobIdList.forEach((jobId) => {
metaDataIndexMap.set(jobId, index);
});
profileSubscriptionAndMetadataArr.push({
subscription: { subscriptionProfileList, listId },
subscription: { subscriptionProfileList, listId, operation },
metadataList,
profiles: [],
});
});
});
profileSubscriptionAndMetadataArr = populateArrWithRespectiveProfileData(
};

/**
* This function performs batching for the subscription and unsubscription requests and attaches respective profile request as well if present
* @param {*} subscribeList
* @param {*} unsubscribeList
* @param {*} profilesList
* @param {*} destination
* @returns
*/
const batchRequestV2 = (subscribeList, unsubscribeList, profilesList, destination) => {
const profileSubscriptionAndMetadataArr = [];
const metaDataIndexMap = new Map();
updateArrWithSubscriptions(
subscribeList,
profilesList,
'subscribe',
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
profileRespList,
);
updateArrWithSubscriptions(
unsubscribeList,
profilesList,
'unsubscribe',
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
);
const subscriptionsAndProfileArr = populateArrWithRespectiveProfileData(
profileSubscriptionAndMetadataArr,
metaDataIndexMap,
profilesList,
);
/* Till this point I have a profileSubscriptionAndMetadataArr
containing the the events in one object for which batching has to happen in following format
[
{
subscription: { subscriptionProfileList, listId1 },
subscription: { subscriptionProfileList, listId1, operation },
metadataList1,
profiles: [respectiveProfiles for above metadata]
},
{
subscription: { subscriptionProfile List With No Profiles, listId2 },
subscription: { subscriptionProfile List With No Profiles, listId2, operation },
metadataList2,
},
{
Expand All @@ -180,14 +220,11 @@ const batchRequestV2 = (subscribeRespList, profileRespList, destination) => {
}
]
*/
return buildRequestsForProfileSubscriptionAndMetadataArr(
profileSubscriptionAndMetadataArr,
destination,
);
return buildProfileAndSubscriptionRequests(subscriptionsAndProfileArr, destination);
/* for identify calls with batching batched with identify with no batching
we will sonctruct O/P as:
we will construct O/P as:
[
[2 calls for identifywith batching],
[2 calls for identify with batching],
[1 call identify calls with batching]
]
*/
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/klaviyo/batchUtil.test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { OperatorType } = require('@rudderstack/json-template-engine');
const {
groupSubscribeResponsesUsingListIdV2,
populateArrWithRespectiveProfileData,
Expand Down Expand Up @@ -94,6 +95,7 @@ describe('generateBatchedSubscriptionRequest', () => {
const subscription = {
listId: 'test-list-id',
subscriptionProfileList: [[{ id: 'profile1' }, { id: 'profile2' }], [{ id: 'profile3' }]],
operation: 'subscribe',
};
const destination = {
Config: {
Expand Down Expand Up @@ -144,6 +146,7 @@ describe('generateBatchedSubscriptionRequest', () => {
const subscription = {
listId: 'test-list-id',
subscriptionProfileList: [],
operation: 'subscribe',
};
const destination = {
Config: {
Expand Down
5 changes: 5 additions & 0 deletions src/v0/destinations/klaviyo/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ const CONFIG_CATEGORIES = {
VIEWED_PRODUCT: { name: 'ViewedProduct' },
ADDED_TO_CART: { name: 'AddedToCart' },
ITEMS: { name: 'Items' },
SUBSCRIBE: { name: 'KlaviyoProfileV2', apiUrl: '/api/profile-subscription-bulk-create-jobs' },
UNSUBSCRIBE: {
name: 'KlaviyoProfileV2',
apiUrl: '/api/profile-subscription-bulk-delete-jobs',
},
};
const ecomExclusionKeys = [
'name',
Expand Down
62 changes: 46 additions & 16 deletions src/v0/destinations/klaviyo/transformV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
const { CONFIG_CATEGORIES, MAPPING_CONFIG } = require('./config');
const {
constructProfile,
subscribeUserToListV2,
subscribeOrUnsubscribeUserToListV2,
buildRequest,
buildSubscriptionRequest,
buildSubscriptionOrUnsubscriptionPayload,
getTrackRequests,
fetchTransformedEvents,
addSubscribeFlagToTraits,
Expand All @@ -24,6 +24,7 @@
adduserIdFromExternalId,
groupEventsByType,
flattenJson,
isDefinedAndNotNull,
} = require('../../util');

/**
Expand All @@ -49,9 +50,17 @@
}
const payload = removeUndefinedAndNullValues(constructProfile(message, destination, true));
const response = { profile: payload };
// check if user wants to subscribe profile or not and listId is present or not
if (traitsInfo?.properties?.subscribe && (traitsInfo.properties?.listId || listId)) {
response.subscription = subscribeUserToListV2(message, traitsInfo, destination);
// check if user wants to subscribe/unsubscribe profile or do nothing and listId is present or not
if (
isDefinedAndNotNull(traitsInfo?.properties?.subscribe) &&
(traitsInfo.properties?.listId || listId)
) {
response.subscription = subscribeOrUnsubscribeUserToListV2(
message,
traitsInfo,
destination,
traitsInfo.properties.subscribe ? 'subscribe' : 'unsubscribe',
);
}
return response;
};
Expand Down Expand Up @@ -93,7 +102,7 @@
};

/**
* Main handlerfunc for group request add/subscribe users to the list based on property sent
* Main handlerfunc for group request add/subscribe to or remove/delete users to the list based on property sent
* DOCS:https://developers.klaviyo.com/en/reference/subscribe_profiles
* @param {*} message
* @param {*} category
Expand All @@ -105,11 +114,17 @@
throw new InstrumentationError('groupId is a required field for group events');
}
const traitsInfo = getFieldValueFromMessage(message, 'traits');
if (!traitsInfo?.subscribe) {
throw new InstrumentationError('Subscribe flag should be true for group call');
if (!isDefinedAndNotNull(traitsInfo?.subscribe)) {
throw new InstrumentationError('Subscribe flag should be included in group call');

Check warning on line 118 in src/v0/destinations/klaviyo/transformV2.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/klaviyo/transformV2.js#L118

Added line #L118 was not covered by tests
}
// throwing error for subscribe flag
return { subscription: subscribeUserToListV2(message, traitsInfo, destination) };
return {
subscription: subscribeOrUnsubscribeUserToListV2(
message,
traitsInfo,
destination,
traitsInfo.subscribe ? 'subscribe' : 'unsubscribe',
),
};
};

const processEvent = (event) => {
Expand Down Expand Up @@ -152,9 +167,7 @@
respList.push(buildRequest(response.profile, destination, CONFIG_CATEGORIES.IDENTIFYV2));
}
if (response.subscription) {
respList.push(
buildSubscriptionRequest(response.subscription, destination, CONFIG_CATEGORIES.TRACKV2),
);
respList.push(buildSubscriptionOrUnsubscriptionPayload(response.subscription, destination));
}
if (response.event) {
respList.push(buildRequest(response.event, destination, CONFIG_CATEGORIES.TRACKV2));
Expand All @@ -163,9 +176,19 @@
};

// This function separates subscribe, proifle and event responses from process () and other responses in chunks
const getEventChunks = (input, subscribeRespList, profileRespList, eventRespList) => {
const getEventChunks = (
input,
subscribeRespList,
profileRespList,
eventRespList,
unsubscriptionList,
) => {
if (input.payload.subscription) {
subscribeRespList.push({ payload: input.payload.subscription, metadata: input.metadata });
if (input.payload.subscription.operation === 'subscribe') {
subscribeRespList.push({ payload: input.payload.subscription, metadata: input.metadata });
} else {
unsubscriptionList.push({ payload: input.payload.subscription, metadata: input.metadata });
}
}
if (input.payload.profile) {
profileRespList.push({ payload: input.payload.profile, metadata: input.metadata });
Expand All @@ -179,6 +202,7 @@
const batchResponseList = [];
const batchErrorRespList = [];
const subscribeRespList = [];
const unsubscriptionList = [];
const profileRespList = [];
const eventRespList = [];
const { destination } = inputs[0];
Expand All @@ -197,14 +221,20 @@
subscribeRespList,
profileRespList,
eventRespList,
unsubscriptionList,
);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
});
const batchedResponseList = batchRequestV2(subscribeRespList, profileRespList, destination);
const batchedResponseList = batchRequestV2(
subscribeRespList,
unsubscriptionList,
profileRespList,
destination,
);
const trackRespList = getTrackRequests(eventRespList, destination);

batchResponseList.push(...trackRespList, ...batchedResponseList);
Expand Down
Loading
Loading