Skip to content

Commit

Permalink
fix: seperate handling flows of record and audiencelist event
Browse files Browse the repository at this point in the history
  • Loading branch information
yashasvibajpai committed Nov 16, 2023
1 parent 1aee8cb commit 29e6913
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 341 deletions.
36 changes: 28 additions & 8 deletions src/v0/destinations/marketo_static_list/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const {
handleRtTfSingleEventError,
} = require('../../util');
const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant');
const { getIds, validateMessageType, transformForRecordEvent } = require('./util');
const { getIds, validateMessageType } = require('./util');
const {
getDestinationExternalID,
defaultRequestConfig,
Expand All @@ -19,6 +19,7 @@ const { formatConfig, MAX_LEAD_IDS_SIZE } = require('./config');
const Cache = require('../../util/cache');
const { getAuthToken } = require('../marketo/transform');
const { InstrumentationError, UnauthorizedError } = require('../../util/errorTypes');
const { processRecordInputs } = require('./transformV2');

const authCache = new Cache(AUTH_CACHE_TTL); // 1 hr

Expand Down Expand Up @@ -156,20 +157,36 @@ const processRouterDest = async (inputs, reqMetadata) => {
// sent with status code 500 to the router to be retried.
const tokenisedInputs = inputs.map((input) => ({ ...input, token }));
// use lodash.groupby to group the inputs based on message type
let transformedRecordEvent = [];
const transformedRecordEvent = [];
let transformedAudienceEvent = [];
const groupedInputs = lodash.groupBy(tokenisedInputs, (input) => input.message.type);

const respList = [];
// process record events
if (groupedInputs.record && groupedInputs.record.length > 0) {
const recordToAudienceTransformationOutput = transformForRecordEvent(groupedInputs.record);
respList.push(...recordToAudienceTransformationOutput.errorArr);
transformedRecordEvent = await triggerProcess(
recordToAudienceTransformationOutput.transformedAudienceEvent,
processEvent,
reqMetadata,
const groupedRecordInputs = groupedInputs.record;
const { staticListId } = groupedRecordInputs[0].destination.Config;
const externalIdGroupedRecordInputs = lodash.groupBy(
groupedRecordInputs,
(input) =>
getDestinationExternalID(input.message, 'MARKETO_STATIC_LIST-leadId') || staticListId,
);
Object.keys(externalIdGroupedRecordInputs).forEach((key) => {
const transformedGroupedRecordEvent = processRecordInputs(externalIdGroupedRecordInputs[key]);
transformedRecordEvent.push(transformedGroupedRecordEvent);
});

// old modular code
// transformedRecordEvent = processRecordInputs(groupedInputs.record, reqMetadata);
// const recordToAudienceTransformationOutput = transformForRecordEvent(groupedInputs.record);
// respList.push(...recordToAudienceTransformationOutput.errorArr);
// transformedRecordEvent = await triggerProcess(
// recordToAudienceTransformationOutput.transformedAudienceEvent,
// processEvent,
// reqMetadata,
// );
}
// process audiencelist events
if (groupedInputs.audiencelist && groupedInputs.audiencelist.length > 0) {
transformedAudienceEvent = await triggerProcess(
groupedInputs.audiencelist,
Expand Down Expand Up @@ -198,7 +215,10 @@ function processMetadataForRouter(output) {

module.exports = {
process,
processEvent,
processRouterDest,
processMetadataForRouter,
authCache,
triggerProcess,
batchResponseBuilder,
};
111 changes: 111 additions & 0 deletions src/v0/destinations/marketo_static_list/transformV2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
const lodash = require('lodash');
const {
defaultPostRequestConfig,
defaultDeleteRequestConfig,
getDestinationExternalID,
defaultRequestConfig,
getSuccessRespEvents,
} = require('../../util');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { MAX_LEAD_IDS_SIZE } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');

/**
* Generates the final response structure to be sent to the destination
* @param {*} endPoint
* @param {*} leadIds
* @param {*} operation
* @param {*} token
* @returns batched response
*/
const responseBuilder = (endPoint, leadIds, operation, token) => {
let updatedEndpoint = endPoint;
if (leadIds.length > 0) {
leadIds.forEach((id) => {
updatedEndpoint = `${updatedEndpoint}id=${id}&`;
});
}
updatedEndpoint = updatedEndpoint.slice(0, -1);
const response = defaultRequestConfig();
response.endpoint = updatedEndpoint;
if (operation === 'insert') {
response.method = defaultPostRequestConfig.requestMethod;
} else {
response.method = defaultDeleteRequestConfig.requestMethod;
}
response.headers = {
Authorization: `Bearer ${token}`,
'Content-Type': JSON_MIME_TYPE,
};
return response;
};

/**
* The function is responsible for building the batched response for a given set of record inputs.
* @param {*} groupedRecordInputs
* @param {*} Config
* @param {*} token
* @param {*} leadIds
* @param {*} operation
* @returns an array of response objects, where each object represents a batched response for a chunk of lead IDs.
*/
const batchResponseBuilder = (groupedRecordInputs, Config, token, leadIds, operation) => {
const { accountId, staticListId } = Config;
const { message } = groupedRecordInputs[0];
const listId = getDestinationExternalID(message, 'MARKETO_STATIC_LIST-leadId') || staticListId;
const endpoint = `https://${accountId}.mktorest.com/rest/v1/lists/${listId}/leads.json?`;
if (!listId) {
throw new InstrumentationError('No static listId is provided');
}
const response = [];
const leadIdsChunks = lodash.chunk(leadIds, MAX_LEAD_IDS_SIZE);
leadIdsChunks.forEach((ids) => {
response.push(responseBuilder(endpoint, ids, operation, token));
});
return response;
};

/**
* A function that processes a list of grouped record inputs.
* It iterates through each input and groups the field IDs based on the action.
* @param {*} groupedRecordInputs
* @returns An array containing the batched responses for the insert and delete actions along with the metadata.
*/
function processRecordInputs(groupedRecordInputs) {
// iterate through each input and group field id based on action
const insertFields = [];
const deleteFields = [];
const finalMetadata = [];
const { Config } = groupedRecordInputs[0].destination;
groupedRecordInputs.forEach((input) => {
const { fields, action } = input.message;
const fieldsId = fields.id;
if (action === 'insert') {
insertFields.push(fieldsId);
finalMetadata.push(input.metadata);
} else if (action === 'delete') {
deleteFields.push(fieldsId);
finalMetadata.push(input.metadata);
}
});
const deleteResponse = batchResponseBuilder(
groupedRecordInputs,
Config,
groupedRecordInputs[0].token,
deleteFields,
'delete',
);
const insertResponse = batchResponseBuilder(
groupedRecordInputs,
Config,
groupedRecordInputs[0].token,
insertFields,
'insert',
);
const batchedResponse = [...deleteResponse, ...insertResponse];
return getSuccessRespEvents(batchedResponse, finalMetadata, groupedRecordInputs[0].destination);
}

module.exports = {
processRecordInputs,
};
90 changes: 0 additions & 90 deletions src/v0/destinations/marketo_static_list/util.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
/* eslint-disable unicorn/consistent-destructuring */
const { getDestinationExternalID, isDefinedAndNotNull, getErrorRespEvents } = require('../../util');
const { InstrumentationError } = require('../../util/errorTypes');
const tags = require('../../util/tags');

/**
* Fetches the ids from the array of objects
Expand Down Expand Up @@ -37,95 +35,7 @@ const validateMessageType = (message, allowedTypes) => {
}
};

function transformForRecordEvent(inputs) {
const successMetadataList = [];
const { message, destination } = inputs[0];
const { staticListId } = destination.Config;
// TODO: Rethink this process
const mslExternalId = getDestinationExternalID(message, 'marketoStaticListId') || staticListId;
// Skeleton for audience message
const transformedAudienceMessage = {
type: 'audiencelist',
context: {
externalId: [
{
type: 'marketoStaticListId',
value: mslExternalId,
},
],
},
properties: {
listData: {
add: [],
remove: [],
},
},
};

// group input based on presence of field id
const groupedInputs = inputs.reduce(
(acc, input) => {
const { fields } = input.message;
const fieldsId = fields?.id;
if (isDefinedAndNotNull(fieldsId)) {
acc[0].push(input);
} else {
acc[1].push(input);
}
return acc;
},
[[], []],
);

// if there are no inputs with field id, then throw error
if (groupedInputs[0].length === 0) {
throw new InstrumentationError('No field id passed in the payload.');
}

// handle error for inputs with no field id
const errorArr = groupedInputs[1].map((input) =>
getErrorRespEvents(input.metadata, 400, 'No field id passed in the payload', {
[tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION,
[tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION,
}),
);

// handle for success case
groupedInputs[0].forEach((input) => {
const { fields, action } = input.message;
const fieldsId = fields.id;
if (action === 'insert') {
transformedAudienceMessage.properties.listData.add.push({ id: fieldsId });
successMetadataList.push(input.metadata);
} else if (action === 'delete') {
transformedAudienceMessage.properties.listData.remove.push({ id: fieldsId });
successMetadataList.push(input.metadata);
} else {
errorArr.push(
getErrorRespEvents(input.metadata, 400, 'Invalid action type', {
[tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION,
[tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION,
}),
);
}
});

const transformedAudienceEvent = [
{
destination,
metadata: successMetadataList,
message: transformedAudienceMessage,
},
];

return {
errorArr,
transformedAudienceEvent,
};
}

module.exports = {
getIds,
validateMessageType,
transformForRecordEvent,
};
Loading

0 comments on commit 29e6913

Please sign in to comment.