Skip to content

Commit

Permalink
fix: update implementation for record event handle
Browse files Browse the repository at this point in the history
  • Loading branch information
utsabc committed Nov 17, 2023
1 parent 29e6913 commit 6ab439e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 85 deletions.
72 changes: 19 additions & 53 deletions src/v0/destinations/marketo_static_list/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ const {
defaultPostRequestConfig,
defaultDeleteRequestConfig,
generateErrorObject,
checkInvalidRtTfEvents,
getSuccessRespEvents,
handleRtTfSingleEventError,
simpleProcessRouterDest,
} = require('../../util');
const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant');
const { getIds, validateMessageType } = require('./util');
Expand Down Expand Up @@ -108,38 +106,13 @@ const process = async (event, _processParams) => {
return response;
};

const triggerProcess = async (inputs, reqMetadata, processParams) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const respList = await Promise.all(
inputs.map(async (input) => {
try {
let resp = input.message;
// transform if not already done
if (!input.message.statusCode) {
resp = await process(input, processParams);
}
if (Array.isArray(input.metadata)) {
return getSuccessRespEvents(resp, [...input.metadata], input.destination);
}
return getSuccessRespEvents(resp, [input.metadata], input.destination);
} catch (error) {
return handleRtTfSingleEventError(input, error, reqMetadata);
}
}),
);
return respList;
};

const processRouterDest = async (inputs, reqMetadata) => {
// Token needs to be generated for marketo which will be done on input level.
// If destination information is not present Error should be thrown
let token;

const { destination } = inputs[0];
try {
token = await getAuthToken(formatConfig(inputs[0].destination));
const token = await getAuthToken(formatConfig(destination));
if (!token) {
throw new UnauthorizedError('Could not retrieve authorisation token');
}
Expand All @@ -152,45 +125,38 @@ const processRouterDest = async (inputs, reqMetadata) => {
return errResponses;
}

// Checking previous status Code. Initially setting to false.
// If true then previous status is 500 and every subsequent event output should be
// sent with status code 500 to the router to be retried.
const tokenisedInputs = inputs.map((input) => ({ ...input, token }));
// use lodash.groupby to group the inputs based on message type
const transformedRecordEvent = [];
let transformedAudienceEvent = [];
const groupedInputs = lodash.groupBy(tokenisedInputs, (input) => input.message.type);
const groupedInputs = lodash.groupBy(inputs, (input) => input.message.type);

const respList = [];
// process record events
if (groupedInputs.record && groupedInputs.record.length > 0) {
if (Array.isArray(groupedInputs.record) && groupedInputs.record.length > 0) {
const groupedRecordInputs = groupedInputs.record;
const { staticListId } = groupedRecordInputs[0].destination.Config;
const { staticListId } = destination.Config;
const externalIdGroupedRecordInputs = lodash.groupBy(
groupedRecordInputs,
(input) =>
getDestinationExternalID(input.message, 'MARKETO_STATIC_LIST-leadId') || staticListId,
);
Object.keys(externalIdGroupedRecordInputs).forEach((key) => {
const transformedGroupedRecordEvent = processRecordInputs(externalIdGroupedRecordInputs[key]);
transformedRecordEvent.push(transformedGroupedRecordEvent);
});
const alltransformedGroupedRecordEvent = await Promise.all(
Object.keys(externalIdGroupedRecordInputs).map(async (key) => {
const transformedGroupedRecordEvent = await processRecordInputs(
externalIdGroupedRecordInputs[key],
destination,
);
return transformedGroupedRecordEvent;
}),
);

// old modular code
// transformedRecordEvent = processRecordInputs(groupedInputs.record, reqMetadata);
// const recordToAudienceTransformationOutput = transformForRecordEvent(groupedInputs.record);
// respList.push(...recordToAudienceTransformationOutput.errorArr);
// transformedRecordEvent = await triggerProcess(
// recordToAudienceTransformationOutput.transformedAudienceEvent,
// processEvent,
// reqMetadata,
// );
transformedRecordEvent.push(alltransformedGroupedRecordEvent.flat());
}
// process audiencelist events
if (groupedInputs.audiencelist && groupedInputs.audiencelist.length > 0) {
transformedAudienceEvent = await triggerProcess(
transformedAudienceEvent = await simpleProcessRouterDest(
groupedInputs.audiencelist,
processEvent,
process,
reqMetadata,
);
}
Expand Down
88 changes: 56 additions & 32 deletions src/v0/destinations/marketo_static_list/transformV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ const {
getDestinationExternalID,
defaultRequestConfig,
getSuccessRespEvents,
isDefinedAndNotNull,
generateErrorObject,
getErrorRespEvents,
} = require('../../util');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { MAX_LEAD_IDS_SIZE } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { InstrumentationError, UnauthorizedError } = require('../../util/errorTypes');
const { getAuthToken } = require('../marketo/transform');
const { formatConfig } = require('../marketo/config');

/**
* Generates the final response structure to be sent to the destination
Expand Down Expand Up @@ -49,14 +54,9 @@ const responseBuilder = (endPoint, leadIds, operation, token) => {
* @param {*} operation
* @returns an array of response objects, where each object represents a batched response for a chunk of lead IDs.
*/
const batchResponseBuilder = (groupedRecordInputs, Config, token, leadIds, operation) => {
const { accountId, staticListId } = Config;
const { message } = groupedRecordInputs[0];
const listId = getDestinationExternalID(message, 'MARKETO_STATIC_LIST-leadId') || staticListId;
const batchResponseBuilder = (listId, Config, token, leadIds, operation) => {
const { accountId } = Config;
const endpoint = `https://${accountId}.mktorest.com/rest/v1/lists/${listId}/leads.json?`;
if (!listId) {
throw new InstrumentationError('No static listId is provided');
}
const response = [];
const leadIdsChunks = lodash.chunk(leadIds, MAX_LEAD_IDS_SIZE);
leadIdsChunks.forEach((ids) => {
Expand All @@ -71,39 +71,63 @@ const batchResponseBuilder = (groupedRecordInputs, Config, token, leadIds, opera
* @param {*} groupedRecordInputs
* @returns An array containing the batched responses for the insert and delete actions along with the metadata.
*/
function processRecordInputs(groupedRecordInputs) {
async function processRecordInputs(groupedRecordInputs, destination) {
const token = await getAuthToken(formatConfig(destination));
if (!token) {
throw new UnauthorizedError('Authorization failed');
}
const { Config } = destination;
const listId =
getDestinationExternalID(groupedRecordInputs[0].message, 'MARKETO_STATIC_LIST-leadId') ||
Config.staticListId;

// iterate through each input and group field id based on action
const insertFields = [];
const deleteFields = [];
const finalMetadata = [];
const { Config } = groupedRecordInputs[0].destination;
const successMetadataForInsert = [];
const successMetadataForDelete = [];
const errorMetadata = [];

groupedRecordInputs.forEach((input) => {
const { fields, action } = input.message;
const fieldsId = fields.id;
if (action === 'insert') {
insertFields.push(fieldsId);
finalMetadata.push(input.metadata);
} else if (action === 'delete') {
deleteFields.push(fieldsId);
finalMetadata.push(input.metadata);
const fieldId = fields.id;
if (action === 'insert' && isDefinedAndNotNull(fieldId)) {
insertFields.push(fieldId);
successMetadataForInsert.push(input.metadata);
} else if (action === 'delete' && isDefinedAndNotNull(fieldId)) {
deleteFields.push(fieldId);
successMetadataForDelete.push(input.metadata);
} else {
errorMetadata.push(input.metadata);
}
});
const deleteResponse = batchResponseBuilder(
groupedRecordInputs,
Config,
groupedRecordInputs[0].token,
deleteFields,
'delete',
const deletePayloads = batchResponseBuilder(listId, Config, token, deleteFields, 'delete');

const deleteResponse = getSuccessRespEvents(
deletePayloads,
successMetadataForDelete,
destination,
true,
);
const insertResponse = batchResponseBuilder(
groupedRecordInputs,
Config,
groupedRecordInputs[0].token,
insertFields,
'insert',

const insertPayloads = batchResponseBuilder(listId, Config, token, insertFields, 'insert');

const insertResponse = getSuccessRespEvents(
insertPayloads,
successMetadataForInsert,
destination,
true,
);

const error = new InstrumentationError(
'Invalid leadIds format or no leadIds found neither to add nor to remove',
);
const batchedResponse = [...deleteResponse, ...insertResponse];
return getSuccessRespEvents(batchedResponse, finalMetadata, groupedRecordInputs[0].destination);
const errorObj = generateErrorObject(error);
const errorResponseList = errorMetadata.map((metadata) =>
getErrorRespEvents(metadata, errorObj.status, errorObj.message, errorObj.statTags),
);

return [deleteResponse, insertResponse, ...errorResponseList];
}

module.exports = {
Expand Down

0 comments on commit 6ab439e

Please sign in to comment.