diff --git a/src/v0/destinations/marketo_static_list/transform.js b/src/v0/destinations/marketo_static_list/transform.js index 02dabb27f7..97346e36d4 100644 --- a/src/v0/destinations/marketo_static_list/transform.js +++ b/src/v0/destinations/marketo_static_list/transform.js @@ -4,9 +4,7 @@ const { defaultPostRequestConfig, defaultDeleteRequestConfig, generateErrorObject, - checkInvalidRtTfEvents, - getSuccessRespEvents, - handleRtTfSingleEventError, + simpleProcessRouterDest, } = require('../../util'); const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant'); const { getIds, validateMessageType } = require('./util'); @@ -108,38 +106,13 @@ const process = async (event, _processParams) => { return response; }; -const triggerProcess = async (inputs, reqMetadata, processParams) => { - const errorRespEvents = checkInvalidRtTfEvents(inputs); - if (errorRespEvents.length > 0) { - return errorRespEvents; - } - - const respList = await Promise.all( - inputs.map(async (input) => { - try { - let resp = input.message; - // transform if not already done - if (!input.message.statusCode) { - resp = await process(input, processParams); - } - if (Array.isArray(input.metadata)) { - return getSuccessRespEvents(resp, [...input.metadata], input.destination); - } - return getSuccessRespEvents(resp, [input.metadata], input.destination); - } catch (error) { - return handleRtTfSingleEventError(input, error, reqMetadata); - } - }), - ); - return respList; -}; - const processRouterDest = async (inputs, reqMetadata) => { // Token needs to be generated for marketo which will be done on input level. // If destination information is not present Error should be thrown - let token; + + const { destination } = inputs[0]; try { - token = await getAuthToken(formatConfig(inputs[0].destination)); + const token = await getAuthToken(formatConfig(destination)); if (!token) { throw new UnauthorizedError('Could not retrieve authorisation token'); } @@ -152,45 +125,38 @@ const processRouterDest = async (inputs, reqMetadata) => { return errResponses; } - // Checking previous status Code. Initially setting to false. - // If true then previous status is 500 and every subsequent event output should be - // sent with status code 500 to the router to be retried. - const tokenisedInputs = inputs.map((input) => ({ ...input, token })); // use lodash.groupby to group the inputs based on message type const transformedRecordEvent = []; let transformedAudienceEvent = []; - const groupedInputs = lodash.groupBy(tokenisedInputs, (input) => input.message.type); + const groupedInputs = lodash.groupBy(inputs, (input) => input.message.type); const respList = []; // process record events - if (groupedInputs.record && groupedInputs.record.length > 0) { + if (Array.isArray(groupedInputs.record) && groupedInputs.record.length > 0) { const groupedRecordInputs = groupedInputs.record; - const { staticListId } = groupedRecordInputs[0].destination.Config; + const { staticListId } = destination.Config; const externalIdGroupedRecordInputs = lodash.groupBy( groupedRecordInputs, (input) => getDestinationExternalID(input.message, 'MARKETO_STATIC_LIST-leadId') || staticListId, ); - Object.keys(externalIdGroupedRecordInputs).forEach((key) => { - const transformedGroupedRecordEvent = processRecordInputs(externalIdGroupedRecordInputs[key]); - transformedRecordEvent.push(transformedGroupedRecordEvent); - }); + const alltransformedGroupedRecordEvent = await Promise.all( + Object.keys(externalIdGroupedRecordInputs).map(async (key) => { + const transformedGroupedRecordEvent = await processRecordInputs( + externalIdGroupedRecordInputs[key], + destination, + ); + return transformedGroupedRecordEvent; + }), + ); - // old modular code - // transformedRecordEvent = processRecordInputs(groupedInputs.record, reqMetadata); - // const recordToAudienceTransformationOutput = transformForRecordEvent(groupedInputs.record); - // respList.push(...recordToAudienceTransformationOutput.errorArr); - // transformedRecordEvent = await triggerProcess( - // recordToAudienceTransformationOutput.transformedAudienceEvent, - // processEvent, - // reqMetadata, - // ); + transformedRecordEvent.push(alltransformedGroupedRecordEvent.flat()); } // process audiencelist events if (groupedInputs.audiencelist && groupedInputs.audiencelist.length > 0) { - transformedAudienceEvent = await triggerProcess( + transformedAudienceEvent = await simpleProcessRouterDest( groupedInputs.audiencelist, - processEvent, + process, reqMetadata, ); } diff --git a/src/v0/destinations/marketo_static_list/transformV2.js b/src/v0/destinations/marketo_static_list/transformV2.js index 0bfd9f7448..bf73dbb8ce 100644 --- a/src/v0/destinations/marketo_static_list/transformV2.js +++ b/src/v0/destinations/marketo_static_list/transformV2.js @@ -5,10 +5,15 @@ const { getDestinationExternalID, defaultRequestConfig, getSuccessRespEvents, + isDefinedAndNotNull, + generateErrorObject, + getErrorRespEvents, } = require('../../util'); const { JSON_MIME_TYPE } = require('../../util/constant'); const { MAX_LEAD_IDS_SIZE } = require('./config'); -const { InstrumentationError } = require('../../util/errorTypes'); +const { InstrumentationError, UnauthorizedError } = require('../../util/errorTypes'); +const { getAuthToken } = require('../marketo/transform'); +const { formatConfig } = require('../marketo/config'); /** * Generates the final response structure to be sent to the destination @@ -49,14 +54,9 @@ const responseBuilder = (endPoint, leadIds, operation, token) => { * @param {*} operation * @returns an array of response objects, where each object represents a batched response for a chunk of lead IDs. */ -const batchResponseBuilder = (groupedRecordInputs, Config, token, leadIds, operation) => { - const { accountId, staticListId } = Config; - const { message } = groupedRecordInputs[0]; - const listId = getDestinationExternalID(message, 'MARKETO_STATIC_LIST-leadId') || staticListId; +const batchResponseBuilder = (listId, Config, token, leadIds, operation) => { + const { accountId } = Config; const endpoint = `https://${accountId}.mktorest.com/rest/v1/lists/${listId}/leads.json?`; - if (!listId) { - throw new InstrumentationError('No static listId is provided'); - } const response = []; const leadIdsChunks = lodash.chunk(leadIds, MAX_LEAD_IDS_SIZE); leadIdsChunks.forEach((ids) => { @@ -71,39 +71,63 @@ const batchResponseBuilder = (groupedRecordInputs, Config, token, leadIds, opera * @param {*} groupedRecordInputs * @returns An array containing the batched responses for the insert and delete actions along with the metadata. */ -function processRecordInputs(groupedRecordInputs) { +async function processRecordInputs(groupedRecordInputs, destination) { + const token = await getAuthToken(formatConfig(destination)); + if (!token) { + throw new UnauthorizedError('Authorization failed'); + } + const { Config } = destination; + const listId = + getDestinationExternalID(groupedRecordInputs[0].message, 'MARKETO_STATIC_LIST-leadId') || + Config.staticListId; + // iterate through each input and group field id based on action const insertFields = []; const deleteFields = []; - const finalMetadata = []; - const { Config } = groupedRecordInputs[0].destination; + const successMetadataForInsert = []; + const successMetadataForDelete = []; + const errorMetadata = []; + groupedRecordInputs.forEach((input) => { const { fields, action } = input.message; - const fieldsId = fields.id; - if (action === 'insert') { - insertFields.push(fieldsId); - finalMetadata.push(input.metadata); - } else if (action === 'delete') { - deleteFields.push(fieldsId); - finalMetadata.push(input.metadata); + const fieldId = fields.id; + if (action === 'insert' && isDefinedAndNotNull(fieldId)) { + insertFields.push(fieldId); + successMetadataForInsert.push(input.metadata); + } else if (action === 'delete' && isDefinedAndNotNull(fieldId)) { + deleteFields.push(fieldId); + successMetadataForDelete.push(input.metadata); + } else { + errorMetadata.push(input.metadata); } }); - const deleteResponse = batchResponseBuilder( - groupedRecordInputs, - Config, - groupedRecordInputs[0].token, - deleteFields, - 'delete', + const deletePayloads = batchResponseBuilder(listId, Config, token, deleteFields, 'delete'); + + const deleteResponse = getSuccessRespEvents( + deletePayloads, + successMetadataForDelete, + destination, + true, ); - const insertResponse = batchResponseBuilder( - groupedRecordInputs, - Config, - groupedRecordInputs[0].token, - insertFields, - 'insert', + + const insertPayloads = batchResponseBuilder(listId, Config, token, insertFields, 'insert'); + + const insertResponse = getSuccessRespEvents( + insertPayloads, + successMetadataForInsert, + destination, + true, + ); + + const error = new InstrumentationError( + 'Invalid leadIds format or no leadIds found neither to add nor to remove', ); - const batchedResponse = [...deleteResponse, ...insertResponse]; - return getSuccessRespEvents(batchedResponse, finalMetadata, groupedRecordInputs[0].destination); + const errorObj = generateErrorObject(error); + const errorResponseList = errorMetadata.map((metadata) => + getErrorRespEvents(metadata, errorObj.status, errorObj.message, errorObj.statTags), + ); + + return [deleteResponse, insertResponse, ...errorResponseList]; } module.exports = {