Skip to content

Commit

Permalink
refactor transformation for record to audience list flow
Browse files Browse the repository at this point in the history
  • Loading branch information
utsabc committed Nov 9, 2023
1 parent 761a1f8 commit 1aee8cb
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 63 deletions.
82 changes: 50 additions & 32 deletions src/v0/destinations/marketo_static_list/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ const {
defaultPostRequestConfig,
defaultDeleteRequestConfig,
generateErrorObject,
checkInvalidRtTfEvents,
getSuccessRespEvents,
handleRtTfSingleEventError,
} = require('../../util');
const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant');
const { getIds, validateMessageType, transformForRecordEvent } = require('./util');
const {
getDestinationExternalID,
defaultRequestConfig,
getErrorRespEvents,
simpleProcessRouterDest,
} = require('../../util');
const { formatConfig, MAX_LEAD_IDS_SIZE } = require('./config');
const Cache = require('../../util/cache');
Expand Down Expand Up @@ -57,10 +59,10 @@ const batchResponseBuilder = (message, Config, token, leadIds, operation) => {
return response;
};

const processEvent = (input) => {
const { token, message, destination } = input;
const processEvent = (event) => {
const { token, message, destination } = event;
const { Config } = destination;
validateMessageType(message, ['audiencelist', 'record']);
validateMessageType(message, ['audiencelist']);
const response = [];
let toAdd;
let toRemove;
Expand Down Expand Up @@ -94,16 +96,43 @@ const processEvent = (input) => {
return response;
};

const process = async (event) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const process = async (event, _processParams) => {
const token = await getAuthToken(formatConfig(event.destination));

if (!token) {
throw new UnauthorizedError('Authorization failed');
}
const response = processEvent({ ...event, token });
const updatedEvent = { ...event, token };
const response = processEvent(updatedEvent);
return response;
};

const triggerProcess = async (inputs, reqMetadata, processParams) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const respList = await Promise.all(
inputs.map(async (input) => {
try {
let resp = input.message;
// transform if not already done
if (!input.message.statusCode) {
resp = await process(input, processParams);
}
if (Array.isArray(input.metadata)) {
return getSuccessRespEvents(resp, [...input.metadata], input.destination);
}
return getSuccessRespEvents(resp, [input.metadata], input.destination);
} catch (error) {
return handleRtTfSingleEventError(input, error, reqMetadata);
}
}),
);
return respList;
};

const processRouterDest = async (inputs, reqMetadata) => {
// Token needs to be generated for marketo which will be done on input level.
// If destination information is not present Error should be thrown
Expand All @@ -114,46 +143,41 @@ const processRouterDest = async (inputs, reqMetadata) => {
throw new UnauthorizedError('Could not retrieve authorisation token');
}
} catch (error) {
// Not using handleRtTfSingleEventError here as this is for multiple events
const errObj = generateErrorObject(error);
const respEvents = getErrorRespEvents(
inputs.map((input) => input.metadata),
errObj.status,
errObj.message,
errObj.statTags,
const errorObj = generateErrorObject(error);
const errResponses = inputs.map((input) =>
getErrorRespEvents(input.metadata, errorObj.status, errorObj.message, errorObj.statTags),
);
return [{ ...respEvents, destination: inputs?.[0]?.destination }];

return errResponses;
}

// Checking previous status Code. Initially setting to false.
// If true then previous status is 500 and every subsequent event output should be
// sent with status code 500 to the router to be retried.
const tokenisedInputs = inputs.map((input) => ({ ...input, token }));
const leadIdObj = {
insert: [],
delete: [],
};
// use lodash.groupby to group the inputs based on message type
let transformedRecordEvent = [];
let transformedAudienceEvent = [];
const groupedInputs = lodash.groupBy(tokenisedInputs, (input) => input.message.type);

const respList = [];
if (groupedInputs.record && groupedInputs.record.length > 0) {
const finalInputForRecordEvent = transformForRecordEvent(groupedInputs.record, leadIdObj);
transformedRecordEvent = await simpleProcessRouterDest(
finalInputForRecordEvent,
const recordToAudienceTransformationOutput = transformForRecordEvent(groupedInputs.record);
respList.push(...recordToAudienceTransformationOutput.errorArr);
transformedRecordEvent = await triggerProcess(
recordToAudienceTransformationOutput.transformedAudienceEvent,
processEvent,
reqMetadata,
);
}
if (groupedInputs.audiencelist && groupedInputs.audiencelist.length > 0) {
transformedAudienceEvent = await simpleProcessRouterDest(
transformedAudienceEvent = await triggerProcess(
groupedInputs.audiencelist,
processEvent,
reqMetadata,
);
}
const respList = [...transformedRecordEvent, ...transformedAudienceEvent];
respList.push(...transformedRecordEvent, ...transformedAudienceEvent);
return respList;
};

Expand All @@ -164,16 +188,10 @@ const processRouterDest = async (inputs, reqMetadata) => {
*/
function processMetadataForRouter(output) {
const { metadata, destination } = output;
// check if metadata[0] is an array or not
let clonedMetadata;
if (Array.isArray(metadata[0])) {
[clonedMetadata] = cloneDeep(metadata);
} else {
clonedMetadata = cloneDeep(metadata);
}
const clonedMetadata = cloneDeep(metadata);
clonedMetadata.forEach((metadataElement) => {
// eslint-disable-next-line no-param-reassign
metadataElement.destInfo = { authKey: destination.ID };
metadataElement.destInfo = { authKey: destination?.ID };
});
return clonedMetadata;
}
Expand Down
112 changes: 81 additions & 31 deletions src/v0/destinations/marketo_static_list/util.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/* eslint-disable unicorn/consistent-destructuring */
const { getDestinationExternalID, isDefinedAndNotNull, getErrorRespEvents } = require('../../util');
const { InstrumentationError } = require('../../util/errorTypes');
const tags = require('../../util/tags');

/**
* Fetches the ids from the array of objects
Expand Down Expand Up @@ -35,43 +37,91 @@ const validateMessageType = (message, allowedTypes) => {
}
};

function transformForRecordEvent(inputs, leadIdObj) {
const finalMetadata = [];
// iterate through each inputs metadata and create a final metadata
const tokenisedInputs = inputs.map((input) => {
const { message } = input;
const { metadata } = input;
finalMetadata.push(metadata);
const { fields, action, type } = message;
if (type !== 'record') {
throw new InstrumentationError('Invalid message type, Supported message type is record.');
}
const { properties } = message;
if (!properties) {
message.properties = {};
}
message.properties.listData = message?.properties.listData || { add: [], remove: [] };
const fieldsId = fields?.id;
if (fieldsId === undefined) {
throw new InstrumentationError('No lead id passed in the payload.');
}
function transformForRecordEvent(inputs) {
const successMetadataList = [];
const { message, destination } = inputs[0];
const { staticListId } = destination.Config;
// TODO: Rethink this process
const mslExternalId = getDestinationExternalID(message, 'marketoStaticListId') || staticListId;
// Skeleton for audience message
const transformedAudienceMessage = {
type: 'audiencelist',
context: {
externalId: [
{
type: 'marketoStaticListId',
value: mslExternalId,
},
],
},
properties: {
listData: {
add: [],
remove: [],
},
},
};

// group input based on presence of field id
const groupedInputs = inputs.reduce(
(acc, input) => {
const { fields } = input.message;
const fieldsId = fields?.id;
if (isDefinedAndNotNull(fieldsId)) {
acc[0].push(input);
} else {
acc[1].push(input);
}
return acc;
},
[[], []],
);

// if there are no inputs with field id, then throw error
if (groupedInputs[0].length === 0) {
throw new InstrumentationError('No field id passed in the payload.');
}

// handle error for inputs with no field id
const errorArr = groupedInputs[1].map((input) =>
getErrorRespEvents(input.metadata, 400, 'No field id passed in the payload', {
[tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION,
[tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION,
}),
);

// handle for success case
groupedInputs[0].forEach((input) => {
const { fields, action } = input.message;
const fieldsId = fields.id;
if (action === 'insert') {
leadIdObj.insert.push({ id: fieldsId });
message.properties.listData.add.push({ id: fieldsId });
transformedAudienceMessage.properties.listData.add.push({ id: fieldsId });
successMetadataList.push(input.metadata);
} else if (action === 'delete') {
leadIdObj.delete.push({ id: fieldsId });
message.properties.listData.remove.push({ id: fieldsId });
transformedAudienceMessage.properties.listData.remove.push({ id: fieldsId });
successMetadataList.push(input.metadata);
} else {
throw new InstrumentationError('Invalid action type');
errorArr.push(
getErrorRespEvents(input.metadata, 400, 'Invalid action type', {
[tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION,
[tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION,
}),
);
}
return input;
});
const finalInput = [tokenisedInputs[0]];
finalInput[0].metadata = finalMetadata;
finalInput[0].message.properties.listData.add = leadIdObj.insert;
finalInput[0].message.properties.listData.remove = leadIdObj.delete;

return finalInput;
const transformedAudienceEvent = [
{
destination,
metadata: successMetadataList,
message: transformedAudienceMessage,
},
];

return {
errorArr,
transformedAudienceEvent,
};
}

module.exports = {
Expand Down

0 comments on commit 1aee8cb

Please sign in to comment.