From 1aee8cb683d1d8831b38e42e2a7ecd107b9ffc4a Mon Sep 17 00:00:00 2001 From: Utsab Chowdhury Date: Thu, 9 Nov 2023 19:40:01 +0530 Subject: [PATCH] refactor transformation for record to audience list flow --- .../marketo_static_list/transform.js | 82 ++++++++----- .../destinations/marketo_static_list/util.js | 112 +++++++++++++----- 2 files changed, 131 insertions(+), 63 deletions(-) diff --git a/src/v0/destinations/marketo_static_list/transform.js b/src/v0/destinations/marketo_static_list/transform.js index d70d2b5a2e..2ef28e7a34 100644 --- a/src/v0/destinations/marketo_static_list/transform.js +++ b/src/v0/destinations/marketo_static_list/transform.js @@ -4,6 +4,9 @@ const { defaultPostRequestConfig, defaultDeleteRequestConfig, generateErrorObject, + checkInvalidRtTfEvents, + getSuccessRespEvents, + handleRtTfSingleEventError, } = require('../../util'); const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant'); const { getIds, validateMessageType, transformForRecordEvent } = require('./util'); @@ -11,7 +14,6 @@ const { getDestinationExternalID, defaultRequestConfig, getErrorRespEvents, - simpleProcessRouterDest, } = require('../../util'); const { formatConfig, MAX_LEAD_IDS_SIZE } = require('./config'); const Cache = require('../../util/cache'); @@ -57,10 +59,10 @@ const batchResponseBuilder = (message, Config, token, leadIds, operation) => { return response; }; -const processEvent = (input) => { - const { token, message, destination } = input; +const processEvent = (event) => { + const { token, message, destination } = event; const { Config } = destination; - validateMessageType(message, ['audiencelist', 'record']); + validateMessageType(message, ['audiencelist']); const response = []; let toAdd; let toRemove; @@ -94,16 +96,43 @@ const processEvent = (input) => { return response; }; -const process = async (event) => { +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const process = async (event, _processParams) => { const token = await getAuthToken(formatConfig(event.destination)); - if (!token) { throw new UnauthorizedError('Authorization failed'); } - const response = processEvent({ ...event, token }); + const updatedEvent = { ...event, token }; + const response = processEvent(updatedEvent); return response; }; +const triggerProcess = async (inputs, reqMetadata, processParams) => { + const errorRespEvents = checkInvalidRtTfEvents(inputs); + if (errorRespEvents.length > 0) { + return errorRespEvents; + } + + const respList = await Promise.all( + inputs.map(async (input) => { + try { + let resp = input.message; + // transform if not already done + if (!input.message.statusCode) { + resp = await process(input, processParams); + } + if (Array.isArray(input.metadata)) { + return getSuccessRespEvents(resp, [...input.metadata], input.destination); + } + return getSuccessRespEvents(resp, [input.metadata], input.destination); + } catch (error) { + return handleRtTfSingleEventError(input, error, reqMetadata); + } + }), + ); + return respList; +}; + const processRouterDest = async (inputs, reqMetadata) => { // Token needs to be generated for marketo which will be done on input level. // If destination information is not present Error should be thrown @@ -114,46 +143,41 @@ const processRouterDest = async (inputs, reqMetadata) => { throw new UnauthorizedError('Could not retrieve authorisation token'); } } catch (error) { - // Not using handleRtTfSingleEventError here as this is for multiple events - const errObj = generateErrorObject(error); - const respEvents = getErrorRespEvents( - inputs.map((input) => input.metadata), - errObj.status, - errObj.message, - errObj.statTags, + const errorObj = generateErrorObject(error); + const errResponses = inputs.map((input) => + getErrorRespEvents(input.metadata, errorObj.status, errorObj.message, errorObj.statTags), ); - return [{ ...respEvents, destination: inputs?.[0]?.destination }]; + + return errResponses; } // Checking previous status Code. Initially setting to false. // If true then previous status is 500 and every subsequent event output should be // sent with status code 500 to the router to be retried. const tokenisedInputs = inputs.map((input) => ({ ...input, token })); - const leadIdObj = { - insert: [], - delete: [], - }; // use lodash.groupby to group the inputs based on message type let transformedRecordEvent = []; let transformedAudienceEvent = []; const groupedInputs = lodash.groupBy(tokenisedInputs, (input) => input.message.type); + const respList = []; if (groupedInputs.record && groupedInputs.record.length > 0) { - const finalInputForRecordEvent = transformForRecordEvent(groupedInputs.record, leadIdObj); - transformedRecordEvent = await simpleProcessRouterDest( - finalInputForRecordEvent, + const recordToAudienceTransformationOutput = transformForRecordEvent(groupedInputs.record); + respList.push(...recordToAudienceTransformationOutput.errorArr); + transformedRecordEvent = await triggerProcess( + recordToAudienceTransformationOutput.transformedAudienceEvent, processEvent, reqMetadata, ); } if (groupedInputs.audiencelist && groupedInputs.audiencelist.length > 0) { - transformedAudienceEvent = await simpleProcessRouterDest( + transformedAudienceEvent = await triggerProcess( groupedInputs.audiencelist, processEvent, reqMetadata, ); } - const respList = [...transformedRecordEvent, ...transformedAudienceEvent]; + respList.push(...transformedRecordEvent, ...transformedAudienceEvent); return respList; }; @@ -164,16 +188,10 @@ const processRouterDest = async (inputs, reqMetadata) => { */ function processMetadataForRouter(output) { const { metadata, destination } = output; - // check if metadata[0] is an array or not - let clonedMetadata; - if (Array.isArray(metadata[0])) { - [clonedMetadata] = cloneDeep(metadata); - } else { - clonedMetadata = cloneDeep(metadata); - } + const clonedMetadata = cloneDeep(metadata); clonedMetadata.forEach((metadataElement) => { // eslint-disable-next-line no-param-reassign - metadataElement.destInfo = { authKey: destination.ID }; + metadataElement.destInfo = { authKey: destination?.ID }; }); return clonedMetadata; } diff --git a/src/v0/destinations/marketo_static_list/util.js b/src/v0/destinations/marketo_static_list/util.js index 7fd20a56d6..48372f5fe0 100644 --- a/src/v0/destinations/marketo_static_list/util.js +++ b/src/v0/destinations/marketo_static_list/util.js @@ -1,5 +1,7 @@ /* eslint-disable unicorn/consistent-destructuring */ +const { getDestinationExternalID, isDefinedAndNotNull, getErrorRespEvents } = require('../../util'); const { InstrumentationError } = require('../../util/errorTypes'); +const tags = require('../../util/tags'); /** * Fetches the ids from the array of objects @@ -35,43 +37,91 @@ const validateMessageType = (message, allowedTypes) => { } }; -function transformForRecordEvent(inputs, leadIdObj) { - const finalMetadata = []; - // iterate through each inputs metadata and create a final metadata - const tokenisedInputs = inputs.map((input) => { - const { message } = input; - const { metadata } = input; - finalMetadata.push(metadata); - const { fields, action, type } = message; - if (type !== 'record') { - throw new InstrumentationError('Invalid message type, Supported message type is record.'); - } - const { properties } = message; - if (!properties) { - message.properties = {}; - } - message.properties.listData = message?.properties.listData || { add: [], remove: [] }; - const fieldsId = fields?.id; - if (fieldsId === undefined) { - throw new InstrumentationError('No lead id passed in the payload.'); - } +function transformForRecordEvent(inputs) { + const successMetadataList = []; + const { message, destination } = inputs[0]; + const { staticListId } = destination.Config; + // TODO: Rethink this process + const mslExternalId = getDestinationExternalID(message, 'marketoStaticListId') || staticListId; + // Skeleton for audience message + const transformedAudienceMessage = { + type: 'audiencelist', + context: { + externalId: [ + { + type: 'marketoStaticListId', + value: mslExternalId, + }, + ], + }, + properties: { + listData: { + add: [], + remove: [], + }, + }, + }; + + // group input based on presence of field id + const groupedInputs = inputs.reduce( + (acc, input) => { + const { fields } = input.message; + const fieldsId = fields?.id; + if (isDefinedAndNotNull(fieldsId)) { + acc[0].push(input); + } else { + acc[1].push(input); + } + return acc; + }, + [[], []], + ); + + // if there are no inputs with field id, then throw error + if (groupedInputs[0].length === 0) { + throw new InstrumentationError('No field id passed in the payload.'); + } + + // handle error for inputs with no field id + const errorArr = groupedInputs[1].map((input) => + getErrorRespEvents(input.metadata, 400, 'No field id passed in the payload', { + [tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION, + [tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION, + }), + ); + + // handle for success case + groupedInputs[0].forEach((input) => { + const { fields, action } = input.message; + const fieldsId = fields.id; if (action === 'insert') { - leadIdObj.insert.push({ id: fieldsId }); - message.properties.listData.add.push({ id: fieldsId }); + transformedAudienceMessage.properties.listData.add.push({ id: fieldsId }); + successMetadataList.push(input.metadata); } else if (action === 'delete') { - leadIdObj.delete.push({ id: fieldsId }); - message.properties.listData.remove.push({ id: fieldsId }); + transformedAudienceMessage.properties.listData.remove.push({ id: fieldsId }); + successMetadataList.push(input.metadata); } else { - throw new InstrumentationError('Invalid action type'); + errorArr.push( + getErrorRespEvents(input.metadata, 400, 'Invalid action type', { + [tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION, + [tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION, + }), + ); } - return input; }); - const finalInput = [tokenisedInputs[0]]; - finalInput[0].metadata = finalMetadata; - finalInput[0].message.properties.listData.add = leadIdObj.insert; - finalInput[0].message.properties.listData.remove = leadIdObj.delete; - return finalInput; + const transformedAudienceEvent = [ + { + destination, + metadata: successMetadataList, + message: transformedAudienceMessage, + }, + ]; + + return { + errorArr, + transformedAudienceEvent, + }; } module.exports = {