From 96db947a21b96080dd5e4ac0fbd2a18b14aa0d3e Mon Sep 17 00:00:00 2001 From: Utsab Chowdhury Date: Sun, 7 Jan 2024 18:16:03 +0530 Subject: [PATCH 1/2] chore: modularise plugin adapter --- src/helpers/pluginAdapter.ts | 378 +++++++++++++++++++++-------------- 1 file changed, 224 insertions(+), 154 deletions(-) diff --git a/src/helpers/pluginAdapter.ts b/src/helpers/pluginAdapter.ts index da15b39b8c..6fda28840c 100644 --- a/src/helpers/pluginAdapter.ts +++ b/src/helpers/pluginAdapter.ts @@ -21,15 +21,35 @@ import { import { generateErrorObject } from '../v0/util'; import { MappedToDestinationKey } from '../constants'; -// error handling +type TransformedPayloadInfo = { + payload: any[]; + metadata: Metadata[]; + destination: Destination; +}; + +type ErrorInfo = { + error: any; + metadata: Metadata[]; + destination: Destination; +}; + +type ProcTransformedState = { + payload: TransformedOutput; + metadata: Metadata; + destination: Destination; +}; + +type RtTransformedState = { + payload: TransformedOutput[]; + metadata: Metadata[]; + destination: Destination; +}; + +type ErrorState = { metadata: Metadata; response: any; destination: Destination }; export class PluginAdapter { private static pluginCache: Map = new Map(); - /** - * - * @param metadata Deduplicate metadata based on jobId - */ static deduplicateMetadata(metadata: Metadata[]) { const jobIdMap = new Map(); const deduplicatedMetadata: Metadata[] = []; @@ -50,19 +70,73 @@ export class PluginAdapter { if (this.pluginCache.has(cacheKey)) { return this.pluginCache.get(cacheKey) as Integration; } - // TODO: default integration config need to make it dynamic by making some sort of config call or get it from config file - // const integrationConfig: IntegrationConfig = { - // name: integrationName, - // saveResponse: true, - // eventOrdering: true, - // plugins: ['preprocessor', 'multiplexer'], - // }; - const integration = await IntegrationsFactory.createIntegration(integrationName, workflowType); this.pluginCache.set(cacheKey, integration); return integration; } + static handleErrors(errors: ErrorInfo[], destination: Destination): ErrorState[] { + const errorList: ErrorState[] = []; + + if (errors.length > 0) { + const nestedErrorList = errors.map((e) => + e.metadata.map((metadata) => ({ + metadata, + response: generateErrorObject(e.error), + destination, + })), + ); + errorList.push(...nestedErrorList.flat()); + } + + return errorList; + } + + // Proc Transform Related Functions + + static handleProcSuccess( + responseList: TransformedPayloadInfo[], + destination: Destination, + ): ProcTransformedState[] { + const transformedPayloadList: ProcTransformedState[] = []; + + for (const [_, response] of responseList.entries()) { + for (const [index, payload] of response.payload.entries()) { + const transformedPayload = payload as TransformedOutput; + transformedPayloadList.push({ + payload: transformedPayload, + metadata: response.metadata[index], + destination, + }); + } + } + + return transformedPayloadList; + } + + static async processProcEvents( + integrationPlugin: Integration, + events: ProcessorTransformationRequest[], + destination: Destination, + ): Promise<{ transformedPayloadList: ProcTransformedState[]; errorList: ErrorState[] }> { + const eventsPayload = events.map((input) => ({ + event: [{ message: input.message as RudderStackEvent } as RudderStackEventPayload], + metadata: [input.metadata], + })); + + const output = await integrationPlugin.execute(eventsPayload, destination); + const responseList = output.resultContext; + const errors = output.errorResults; + + // Handle Errors + const errorList = PluginAdapter.handleErrors(errors, destination); + + // Handle Success + const transformedPayloadList = PluginAdapter.handleProcSuccess(responseList, destination); + + return { transformedPayloadList, errorList }; + } + public static async transformAtProcessor( inputs: ProcessorTransformationRequest[], integrationName: string, @@ -81,46 +155,17 @@ export class PluginAdapter { const result = await Promise.all( eventsPerDestinationId.map(async (inputs) => { - const events = inputs.map((input) => ({ - event: [{ message: input.message as RudderStackEvent } as RudderStackEventPayload], - metadata: [input.metadata], - })); const { destination } = inputs[0]; - const output = await integrationPlugin.execute(events, destination); - const responseList = output.resultContext; - const errors = output.errorResults; - - const errorList: { metadata: Metadata; response: any; destination: Destination }[] = []; - // handle the error scenario - if (errors.length > 0) { - const nestedErrorList = errors.map((e) => { - const errResponses = e.metadata.map((metadata) => ({ - metadata, - response: generateErrorObject(e.error), // add further tags here - destination, - })); - return errResponses; - }); - errorList.push(...nestedErrorList.flat()); - } + const output = await PluginAdapter.processProcEvents( + integrationPlugin, + inputs, + destination, + ); - // handle the success scenario - const transformedPayloadList: { - payload: TransformedOutput; - metadata: Metadata; - destination: Destination; - }[] = []; - for (const [_, response] of responseList.entries()) { - for (const [index, payload] of response.payload.entries()) { - const transformedPayload = payload as TransformedOutput; - transformedPayloadList.push({ - payload: transformedPayload, - metadata: response.metadata[index], - destination, - }); - } - } - return { transformedPayloadList, errorList }; + return { + transformedPayloadList: output.transformedPayloadList, + errorList: output.errorList, + }; }), ); @@ -135,6 +180,122 @@ export class PluginAdapter { return { allSuccessList, allErrorList }; } + // Rt Transform Related Functions + + static rankResponsesByUniqueJobIds(responseList: TransformedPayloadInfo[]) { + // ranking the responses based on the number of unique jobIds in the metadata array + const uniqueJobRank: { uniqueJobIds: number; index: number }[] = []; + for (const [index, response] of responseList.entries()) { + const uniqueJobIds = Array.from(new Set(response.metadata.map((meta) => meta.jobId))); + uniqueJobRank.push({ + uniqueJobIds: uniqueJobIds.length, + index, + }); + } + uniqueJobRank.sort((a, b) => b.uniqueJobIds - a.uniqueJobIds); + // ranking ends here with uniqueJobRank containing the index of the responseList in the order of the number of unique jobIds in the metadata array + // example: [ { uniqueJobIds: 3, index: 0 }, { uniqueJobIds: 2, index: 1 } ] + + return uniqueJobRank; + } + + static createFinalResponse( + uniqueJobRank: { uniqueJobIds: number; index: number }[], + responseList: TransformedPayloadInfo[], + destination: Destination, + ): RtTransformedState[] { + const finalResponse: RtTransformedState[] = []; + // creating a map of jobId to position in the metadata array + // example: { jobId1: 1, jobId2: 1, jobId3: 0, jobId4: 2} + // motivation: prevent metadata duplication in the final response at all levels + const jobIdPositionMap: Map = new Map(); + + for (const rank of uniqueJobRank) { + // iteratively checking payloads with the highest number of unique jobIds to lowest + const rankedResponse = responseList[rank.index]; + let isCurrentResponseAddedToFinalPayload = false; + // iterate each metadata in the rankedResponse to check if any jobId is already present in the finalResponse + for (const meta of rankedResponse.metadata) { + // check if the jobId already has a position in final response + if (jobIdPositionMap.has(meta.jobId)) { + // if yes, then we need append the entire rankedResponse including all the payloads and metadata at same position + const position = jobIdPositionMap.get(meta.jobId) as number; + const currentOutput = rankedResponse.payload.map( + (payload) => payload as TransformedOutput, + ); + finalResponse[position].payload.push(...currentOutput); + // push metdata to final response only if it is not already present + rankedResponse.metadata.forEach((meta) => { + // get all the exisitng jobIds in the metadata array at the position from the finalResponse + const jobIdsInResponse = finalResponse[position].metadata.map((fRmeta) => fRmeta.jobId); + // check if the jobId is already present in the metadata array + if (!jobIdsInResponse.includes(meta.jobId)) { + finalResponse[position].metadata.push(meta); + } + }); + finalResponse[position].destination = destination; + isCurrentResponseAddedToFinalPayload = true; + // break the loop as we have already appended the entire rankedResponse to the finalResponse + break; + } + } + // if the current rankedResponse is not added to the finalResponse, then we need to add it as a new entry + if (!isCurrentResponseAddedToFinalPayload) { + finalResponse.push({ + payload: rankedResponse.payload.map((payload) => payload as TransformedOutput), + // only stored deduplicated metadata in the final response + metadata: PluginAdapter.deduplicateMetadata(rankedResponse.metadata), + destination, + }); + // update the jobIdPositionMap for all the jobIds in the rankedResponse + rankedResponse.metadata.forEach((meta) => { + jobIdPositionMap.set(meta.jobId, finalResponse.length - 1); + }); + } + } + return finalResponse; + } + + static handleRtSuccess( + responseList: TransformedPayloadInfo[], + destination: Destination, + ): RtTransformedState[] { + // Rank responses based on unique jobIds in metadata array + const uniqueJobRank = PluginAdapter.rankResponsesByUniqueJobIds(responseList); + + // Create final response with deduplicated metadata + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + return finalResponse; + } + + static async processRtEvents( + integrationPlugin: Integration, + events: RouterTransformationRequestData[], + destination: Destination, + ): Promise<{ transformedPayloadList: RtTransformedState[]; errorList: ErrorState[] }> { + const inputPayload = events.map((input) => ({ + event: [{ message: input.message as RudderStackEvent } as RudderStackEventPayload], + metadata: [input.metadata], + })); + + const output = await integrationPlugin.execute(inputPayload, destination); + const responseList = output.resultContext; + const errors = output.errorResults; + + // Handle Errors + const errorList = PluginAdapter.handleErrors(errors, destination); + + // Handle Success + const transformedPayloadList = PluginAdapter.handleRtSuccess(responseList, destination); + + return { transformedPayloadList, errorList }; + } + public static async transformAtRouter( inputs: RouterTransformationRequestData[], integrationName: string, @@ -155,111 +316,20 @@ export class PluginAdapter { ); const result = await Promise.all( - eventsPerDestinationId.map(async (inputs) => { - const input = inputs.map((input) => ({ - event: [{ message: input.message as RudderStackEvent } as RudderStackEventPayload], - metadata: [input.metadata], - })); - - const { destination } = inputs[0]; - - // calling the plugin and we can expect batched and multiplexed responses - // example: [ { payload: [event1, event2, event3], metadata: [metadata1, metadata2, metdata3] }, { payload: [event3, event4], metadata: [metadata3, metadata4] } ] - - const output = await integrationPlugin.execute(input, destination); - const responseList = output.resultContext; - const errors = output.errorResults; - - // handle error scenario - const errorList: { metadata: Metadata; response: any; destination: Destination }[] = []; - if (errors.length > 0) { - const nestedErrorList = errors.map((e) => { - const errResponses = e.metadata.map((metadata) => ({ - metadata, - response: generateErrorObject(e.error), // add further tags here - destination, - })); - return errResponses; - }); - errorList.push(...nestedErrorList.flat()); - } - - // handle success scenraio - // ranking the responses based on the number of unique jobIds in the metadata array - const uniqueJobRank: { uniqueJobIds: number; index: number }[] = []; - for (const [index, response] of responseList.entries()) { - const uniqueJobIds = Array.from(new Set(response.metadata.map((meta) => meta.jobId))); - uniqueJobRank.push({ - uniqueJobIds: uniqueJobIds.length, - index, - }); - } - uniqueJobRank.sort((a, b) => b.uniqueJobIds - a.uniqueJobIds); - // ranking ends here with uniqueJobRank containing the index of the responseList in the order of the number of unique jobIds in the metadata array - // example: [ { uniqueJobIds: 3, index: 0 }, { uniqueJobIds: 2, index: 1 } ] - - const finalResponse: { - payload: TransformedOutput[]; - metadata: Metadata[]; - destination: Destination; - }[] = []; - // creating a map of jobId to position in the metadata array - // example: { jobId1: 1, jobId2: 1, jobId3: 0, jobId4: 2} - // motivation: prevent metadata duplication in the final response at all levels - const jobIdPositionMap: Map = new Map(); - for (const rank of uniqueJobRank) { - // iteratively checking payloads with the highest number of unique jobIds to lowest - const rankedResponse = responseList[rank.index]; - let isCurrentResponseAddedToFinalPayload = false; - // iterate each metadata in the rankedResponse to check if any jobId is already present in the finalResponse - for (const meta of rankedResponse.metadata) { - // check if the jobId already has a position in final response - if (jobIdPositionMap.has(meta.jobId)) { - // if yes, then we need append the entire rankedResponse including all the payloads and metadata at same position - const position = jobIdPositionMap.get(meta.jobId) as number; - const currentOutput = rankedResponse.payload.map( - (payload) => payload as TransformedOutput, - ); - finalResponse[position].payload.push(...currentOutput); - // push metdata to final response only if it is not already present - rankedResponse.metadata.forEach((meta) => { - // get all the exisitng jobIds in the metadata array at the position from the finalResponse - const jobIdsInResponse = finalResponse[position].metadata.map( - (fRmeta) => fRmeta.jobId, - ); - // check if the jobId is already present in the metadata array - if (!jobIdsInResponse.includes(meta.jobId)) { - finalResponse[position].metadata.push(meta); - } - }); - finalResponse[position].destination = destination; - isCurrentResponseAddedToFinalPayload = true; - // break the loop as we have already appended the entire rankedResponse to the finalResponse - break; - } - } - // if the current rankedResponse is not added to the finalResponse, then we need to add it as a new entry - if (!isCurrentResponseAddedToFinalPayload) { - finalResponse.push({ - payload: rankedResponse.payload.map((payload) => payload as TransformedOutput), - // only stored deduplicated metadata in the final response - metadata: PluginAdapter.deduplicateMetadata(rankedResponse.metadata), - destination, - }); - // update the jobIdPositionMap for all the jobIds in the rankedResponse - rankedResponse.metadata.forEach((meta) => { - jobIdPositionMap.set(meta.jobId, finalResponse.length - 1); - }); - } - } - return { transformedPayloadList: finalResponse, errorList }; + eventsPerDestinationId.map(async (events) => { + const output = await PluginAdapter.processRtEvents( + integrationPlugin, + events, + events[0].destination, + ); + return { + errorList: output.errorList, + transformedPayloadList: output.transformedPayloadList, + }; }), ); - const allSuccessList: { - payload: TransformedOutput[]; - metadata: Metadata[]; - destination: Destination; - }[] = result.flatMap((res) => res.transformedPayloadList); + + const allSuccessList = result.flatMap((res) => res.transformedPayloadList); const allErrorList: { metadata: Metadata; response: any; destination: Destination }[] = result.flatMap((res) => res.errorList); From bb21242165b5fbb17c553417af0d4c6941379cc5 Mon Sep 17 00:00:00 2001 From: Utsab Chowdhury Date: Tue, 16 Jan 2024 22:09:47 +0530 Subject: [PATCH 2/2] chore: update tests --- package-lock.json | 117 +++++- package.json | 4 +- src/helpers/pluginAdapter.test.ts | 651 +++++++++++++++++++++++++++++- src/helpers/pluginAdapter.ts | 23 +- 4 files changed, 773 insertions(+), 22 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3075a2f1d6..7737fb5d0a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,8 +19,8 @@ "@koa/router": "^12.0.0", "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.6", - "@rudderstack/integrations-lib": "^0.1.9", - "@rudderstack/integrations-store": "^0.1.1", + "@rudderstack/integrations-lib": "^0.1.10", + "@rudderstack/integrations-store": "^0.1.2", "@rudderstack/workflow-engine": "^0.6.9", "ajv": "^8.12.0", "ajv-draft-04": "^1.0.0", @@ -117,6 +117,59 @@ "typescript": "^5.0.4" } }, + "../poc-integration/rudder-integrations-lib": { + "name": "@rudderstack/integrations-lib", + "version": "0.1.10", + "extraneous": true, + "license": "MIT", + "dependencies": { + "@rudderstack/workflow-engine": "^0.5.7", + "axios": "^1.4.0", + "axios-mock-adapter": "^1.22.0", + "crypto": "^1.0.1", + "get-value": "^3.0.1", + "handlebars": "^4.7.8", + "lodash": "^4.17.21", + "moment": "^2.29.4", + "moment-timezone": "^0.5.43", + "set-value": "^4.1.0", + "sha256": "^0.2.0", + "tslib": "^2.4.0" + }, + "devDependencies": { + "@types/get-value": "^3.0.3", + "@types/jest": "^29.5.4", + "@types/lodash": "^4.14.195", + "@types/node": "^20.3.3", + "@types/set-value": "^4.0.1", + "@types/sha256": "^0.2.0", + "jest": "^29.4.3", + "pre-commit": "^1.2.2", + "prettier": "^2.8.4", + "ts-jest": "^29.0.5", + "ts-node": "^10.9.1", + "typescript": "^5.1.6" + } + }, + "../poc-integration/rudder-integrations-store": { + "name": "@rudderstack/integrations-store", + "version": "0.1.2", + "extraneous": true, + "license": "MIT", + "dependencies": { + "@rudderstack/integrations-lib": "^0.1.10", + "@rudderstack/workflow-engine": "^0.5.4" + }, + "devDependencies": { + "@types/jest": "^29.5.4", + "@types/node": "^20.3.3", + "jest": "^29.7.0", + "prettier": "^2.8.4", + "ts-jest": "^29.0.5", + "ts-node": "^10.9.1", + "typescript": "^5.1.6" + } + }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "dev": true, @@ -2907,6 +2960,7 @@ }, "node_modules/@isaacs/cliui": { "version": "8.0.2", + "dev": true, "license": "ISC", "dependencies": { "string-width": "^5.1.2", @@ -2922,6 +2976,7 @@ }, "node_modules/@isaacs/cliui/node_modules/ansi-regex": { "version": "6.0.1", + "dev": true, "license": "MIT", "engines": { "node": ">=12" @@ -2932,6 +2987,7 @@ }, "node_modules/@isaacs/cliui/node_modules/ansi-styles": { "version": "6.2.1", + "dev": true, "license": "MIT", "engines": { "node": ">=12" @@ -2942,6 +2998,7 @@ }, "node_modules/@isaacs/cliui/node_modules/string-width": { "version": "5.1.2", + "dev": true, "license": "MIT", "dependencies": { "eastasianwidth": "^0.2.0", @@ -2957,6 +3014,7 @@ }, "node_modules/@isaacs/cliui/node_modules/strip-ansi": { "version": "7.1.0", + "dev": true, "license": "MIT", "dependencies": { "ansi-regex": "^6.0.1" @@ -2970,6 +3028,7 @@ }, "node_modules/@isaacs/cliui/node_modules/wrap-ansi": { "version": "8.1.0", + "dev": true, "license": "MIT", "dependencies": { "ansi-styles": "^6.1.0", @@ -3519,6 +3578,7 @@ }, "node_modules/@pkgjs/parseargs": { "version": "0.11.0", + "dev": true, "license": "MIT", "optional": true, "engines": { @@ -3599,9 +3659,9 @@ } }, "node_modules/@rudderstack/integrations-lib": { - "version": "0.1.9", - "resolved": "https://registry.npmjs.org/@rudderstack/integrations-lib/-/integrations-lib-0.1.9.tgz", - "integrity": "sha512-ROi/LfI7PXqKDrjSig+1Rf2TQ8MgxJGJ7sAD1B0PmRKELQpxK6PLt8QF+vKXl8wYILQu2gwTkZ5o+uwmNKxGzg==", + "version": "0.1.10", + "resolved": "https://registry.npmjs.org/@rudderstack/integrations-lib/-/integrations-lib-0.1.10.tgz", + "integrity": "sha512-QfjPYT8bjOtgceaIDsPRx8BQxEHUObI5uDVYHt7vOLuTyyMz3DrhZInzuutmvh/3xvf6y64/FvXmx6YBrcewwA==", "dependencies": { "@rudderstack/workflow-engine": "^0.5.7", "axios": "^1.4.0", @@ -3666,11 +3726,11 @@ } }, "node_modules/@rudderstack/integrations-store": { - "version": "0.1.1", - "resolved": "https://registry.npmjs.org/@rudderstack/integrations-store/-/integrations-store-0.1.1.tgz", - "integrity": "sha512-nCm7TZ6pa7gWywTeNElGxccdZRof7JYWeIVPGLqtxRisVKUOC6AXDX7XT/cNQyNyxjrmPMMQQYWR0y0BgTe+5Q==", + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/@rudderstack/integrations-store/-/integrations-store-0.1.2.tgz", + "integrity": "sha512-og7PgGlFwk9xt7vXGGxqWJ4IrSVUEU/Zl4+efYPS5z7bA6GOYHSX99IBZGRSGJIQP1ME1YwEIMKJ7xwlaNKOGQ==", "dependencies": { - "@rudderstack/integrations-lib": "^0.1.9", + "@rudderstack/integrations-lib": "^0.1.10", "@rudderstack/workflow-engine": "^0.5.4" } }, @@ -5077,6 +5137,7 @@ }, "node_modules/aggregate-error": { "version": "3.1.0", + "dev": true, "license": "MIT", "dependencies": { "clean-stack": "^2.0.0", @@ -5169,6 +5230,7 @@ }, "node_modules/ansi-styles": { "version": "4.3.0", + "dev": true, "license": "MIT", "dependencies": { "color-convert": "^2.0.1" @@ -5859,6 +5921,7 @@ }, "node_modules/chalk": { "version": "4.1.2", + "dev": true, "license": "MIT", "dependencies": { "ansi-styles": "^4.1.0", @@ -5908,6 +5971,7 @@ }, "node_modules/ci-info": { "version": "3.7.1", + "dev": true, "funding": [ { "type": "github", @@ -5937,6 +6001,7 @@ }, "node_modules/clean-stack": { "version": "2.2.0", + "dev": true, "license": "MIT", "engines": { "node": ">=6" @@ -6107,6 +6172,7 @@ }, "node_modules/color-convert": { "version": "2.0.1", + "dev": true, "license": "MIT", "dependencies": { "color-name": "~1.1.4" @@ -7785,6 +7851,7 @@ }, "node_modules/cross-spawn": { "version": "7.0.3", + "dev": true, "license": "MIT", "dependencies": { "path-key": "^3.1.0", @@ -8574,6 +8641,7 @@ }, "node_modules/eastasianwidth": { "version": "0.2.0", + "dev": true, "license": "MIT" }, "node_modules/ee-first": { @@ -8598,6 +8666,7 @@ }, "node_modules/emoji-regex": { "version": "9.2.2", + "dev": true, "license": "MIT" }, "node_modules/empty-dir": { @@ -9639,6 +9708,7 @@ }, "node_modules/foreground-child": { "version": "3.1.1", + "dev": true, "license": "ISC", "dependencies": { "cross-spawn": "^7.0.0", @@ -9653,6 +9723,7 @@ }, "node_modules/foreground-child/node_modules/signal-exit": { "version": "4.1.0", + "dev": true, "license": "ISC", "engines": { "node": ">=14" @@ -10056,6 +10127,7 @@ }, "node_modules/glob": { "version": "10.3.3", + "dev": true, "license": "ISC", "dependencies": { "foreground-child": "^3.1.0", @@ -10087,6 +10159,7 @@ }, "node_modules/glob/node_modules/brace-expansion": { "version": "2.0.1", + "dev": true, "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" @@ -10094,6 +10167,7 @@ }, "node_modules/glob/node_modules/minimatch": { "version": "9.0.3", + "dev": true, "license": "ISC", "dependencies": { "brace-expansion": "^2.0.1" @@ -10283,6 +10357,7 @@ }, "node_modules/graceful-fs": { "version": "4.2.10", + "dev": true, "license": "ISC" }, "node_modules/graphemer": { @@ -10343,6 +10418,7 @@ }, "node_modules/has-flag": { "version": "4.0.0", + "dev": true, "license": "MIT", "engines": { "node": ">=8" @@ -10462,6 +10538,7 @@ }, "node_modules/hosted-git-info": { "version": "4.1.0", + "dev": true, "license": "ISC", "dependencies": { "lru-cache": "^6.0.0" @@ -10472,6 +10549,7 @@ }, "node_modules/hosted-git-info/node_modules/lru-cache": { "version": "6.0.0", + "dev": true, "license": "ISC", "dependencies": { "yallist": "^4.0.0" @@ -10482,6 +10560,7 @@ }, "node_modules/hosted-git-info/node_modules/yallist": { "version": "4.0.0", + "dev": true, "license": "ISC" }, "node_modules/html-escaper": { @@ -10701,6 +10780,7 @@ }, "node_modules/indent-string": { "version": "4.0.0", + "dev": true, "license": "MIT", "engines": { "node": ">=8" @@ -10732,6 +10812,7 @@ }, "node_modules/ini": { "version": "1.3.8", + "dev": true, "license": "ISC" }, "node_modules/inquirer": { @@ -11271,6 +11352,7 @@ }, "node_modules/isexe": { "version": "2.0.0", + "dev": true, "license": "ISC" }, "node_modules/isobject": { @@ -11371,6 +11453,7 @@ }, "node_modules/jackspeak": { "version": "2.2.2", + "dev": true, "license": "BlueOak-1.0.0", "dependencies": { "@isaacs/cliui": "^8.0.2" @@ -13026,6 +13109,7 @@ }, "node_modules/minipass": { "version": "7.0.2", + "dev": true, "license": "ISC", "engines": { "node": ">=16 || 14 >=14.17" @@ -13537,6 +13621,7 @@ }, "node_modules/normalize-package-data": { "version": "3.0.3", + "dev": true, "license": "BSD-2-Clause", "dependencies": { "hosted-git-info": "^4.0.1", @@ -16597,6 +16682,7 @@ }, "node_modules/p-map": { "version": "4.0.0", + "dev": true, "license": "MIT", "dependencies": { "aggregate-error": "^3.0.0" @@ -16726,6 +16812,7 @@ }, "node_modules/path-key": { "version": "3.1.1", + "dev": true, "license": "MIT", "engines": { "node": ">=8" @@ -16737,6 +16824,7 @@ }, "node_modules/path-scurry": { "version": "1.10.1", + "dev": true, "license": "BlueOak-1.0.0", "dependencies": { "lru-cache": "^9.1.1 || ^10.0.0", @@ -16751,6 +16839,7 @@ }, "node_modules/path-scurry/node_modules/lru-cache": { "version": "10.0.0", + "dev": true, "license": "ISC", "engines": { "node": "14 || >=16.14" @@ -18118,6 +18207,7 @@ }, "node_modules/shebang-command": { "version": "2.0.0", + "dev": true, "license": "MIT", "dependencies": { "shebang-regex": "^3.0.0" @@ -18128,6 +18218,7 @@ }, "node_modules/shebang-regex": { "version": "3.0.0", + "dev": true, "license": "MIT", "engines": { "node": ">=8" @@ -18573,6 +18664,7 @@ "node_modules/string-width-cjs": { "name": "string-width", "version": "4.2.3", + "dev": true, "license": "MIT", "dependencies": { "emoji-regex": "^8.0.0", @@ -18585,10 +18677,12 @@ }, "node_modules/string-width-cjs/node_modules/emoji-regex": { "version": "8.0.0", + "dev": true, "license": "MIT" }, "node_modules/string-width-cjs/node_modules/is-fullwidth-code-point": { "version": "3.0.0", + "dev": true, "license": "MIT", "engines": { "node": ">=8" @@ -18670,6 +18764,7 @@ "node_modules/strip-ansi-cjs": { "name": "strip-ansi", "version": "6.0.1", + "dev": true, "license": "MIT", "dependencies": { "ansi-regex": "^5.0.1" @@ -18781,6 +18876,7 @@ }, "node_modules/supports-color": { "version": "7.2.0", + "dev": true, "license": "MIT", "dependencies": { "has-flag": "^4.0.0" @@ -19046,6 +19142,7 @@ }, "node_modules/text-table": { "version": "0.2.0", + "dev": true, "license": "MIT" }, "node_modules/through": { @@ -19635,6 +19732,7 @@ }, "node_modules/which": { "version": "2.0.2", + "dev": true, "license": "ISC", "dependencies": { "isexe": "^2.0.0" @@ -19790,6 +19888,7 @@ "node_modules/wrap-ansi-cjs": { "name": "wrap-ansi", "version": "7.0.0", + "dev": true, "license": "MIT", "dependencies": { "ansi-styles": "^4.0.0", diff --git a/package.json b/package.json index 72465af983..1064bfad9c 100644 --- a/package.json +++ b/package.json @@ -64,8 +64,8 @@ "@koa/router": "^12.0.0", "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.6", - "@rudderstack/integrations-lib": "^0.1.9", - "@rudderstack/integrations-store": "^0.1.1", + "@rudderstack/integrations-lib": "^0.1.10", + "@rudderstack/integrations-store": "^0.1.2", "@rudderstack/workflow-engine": "^0.6.9", "ajv": "^8.12.0", "ajv-draft-04": "^1.0.0", diff --git a/src/helpers/pluginAdapter.test.ts b/src/helpers/pluginAdapter.test.ts index 7fe45fa2e2..171e4783d0 100644 --- a/src/helpers/pluginAdapter.test.ts +++ b/src/helpers/pluginAdapter.test.ts @@ -6,7 +6,7 @@ import { WorkflowType, } from '@rudderstack/integrations-lib'; import { IntegrationsFactory } from '@rudderstack/integrations-store'; -import { PluginAdapter } from './pluginAdapter'; +import { ErrorInfo, ErrorState, PluginAdapter, TransformedPayloadInfo } from './pluginAdapter'; import { ProcessorTransformationRequest, RouterTransformationRequestData } from '../types'; import { generateErrorObject } from '../v0/util'; @@ -108,6 +108,307 @@ describe('getPlugin', () => { }); }); +describe('deduplicateMetadataByJobId', () => { + // Should return an empty array when given an empty array + it('should return an empty array when given an empty array', () => { + const metadata: Metadata[] = []; + const result = PluginAdapter.deduplicateMetadataByJobId(metadata); + expect(result).toEqual([]); + }); + + // Should return the same array when all metadata have unique jobId + it('should return the same array when all metadata have unique jobId', () => { + const metadata: Metadata[] = [generateMetadata(1), generateMetadata(2), generateMetadata(3)]; + const result = PluginAdapter.deduplicateMetadataByJobId(metadata); + expect(result).toEqual(metadata); + }); + + // Should remove duplicates based on jobId and return the deduplicated array + it('should remove duplicates based on jobId and return the deduplicated array', () => { + const metadata: Metadata[] = [ + generateMetadata(1), + generateMetadata(2), + generateMetadata(1), + generateMetadata(3), + ]; + const result = PluginAdapter.deduplicateMetadataByJobId(metadata); + expect(result).toEqual([generateMetadata(1), generateMetadata(2), generateMetadata(3)]); + }); + + // Should handle an array with a single metadata object + it('should handle an array with a single metadata object', () => { + const metadata: Metadata[] = [generateMetadata(1)]; + const result = PluginAdapter.deduplicateMetadataByJobId(metadata); + expect(result).toEqual(metadata); + }); + + // Should handle an array with multiple metadata objects having the same jobId + it('should handle an array with multiple metadata objects having the same jobId', () => { + const metadata: Metadata[] = [generateMetadata(1), generateMetadata(1), generateMetadata(1)]; + const result = PluginAdapter.deduplicateMetadataByJobId(metadata); + expect(result).toEqual([generateMetadata(1)]); + }); + + // Should handle an array with multiple metadata objects having different jobIds but with some duplicates + it('should handle an array with multiple metadata objects having different jobIds but with some duplicates', () => { + const metadata: Metadata[] = [ + generateMetadata(1), + generateMetadata(2), + generateMetadata(1), + generateMetadata(3), + generateMetadata(2), + ]; + const result = PluginAdapter.deduplicateMetadataByJobId(metadata); + expect(result).toEqual([generateMetadata(1), generateMetadata(2), generateMetadata(3)]); + }); +}); + +describe('handleErrors', () => { + // Returns an empty array when passed an empty array of errors and a destination + it('should return an empty array when passed an empty array of errors and a destination', () => { + const errors: ErrorInfo[] = []; + const destination: Destination = generateDestination('destination1', 'Destination 1'); + + const errorList = PluginAdapter.handleErrors(errors, destination); + + expect(errorList).toEqual([]); + }); + + // Returns an array of ErrorState objects when passed an array of ErrorInfo objects and a destination + it('should return an array of ErrorState objects when passed an array of ErrorInfo objects and a destination', () => { + const errors: ErrorInfo[] = [ + { + error: new Error('Error 1'), + metadata: [generateMetadata(1), generateMetadata(2)], + }, + { + error: new Error('Error 2'), + metadata: [generateMetadata(3)], + }, + ]; + const destination: Destination = generateDestination('destination1', 'Destination 1'); + + const errorList = PluginAdapter.handleErrors(errors, destination); + + expect(errorList).toEqual([ + { + metadata: generateMetadata(1), + response: generateErrorObject(new Error('Error 1')), + destination: generateDestination('destination1', 'Destination 1'), + }, + { + metadata: generateMetadata(2), + response: generateErrorObject(new Error('Error 1')), + destination: generateDestination('destination1', 'Destination 1'), + }, + { + metadata: generateMetadata(3), + response: generateErrorObject(new Error('Error 2')), + destination: generateDestination('destination1', 'Destination 1'), + }, + ]); + }); + + // Returns an empty array when passed an empty array of errors and no destination + it('should return an empty array when passed an empty array of errors and no destination', () => { + const errors: ErrorInfo[] = []; + + const errorList = PluginAdapter.handleErrors(errors, undefined as any); + + expect(errorList).toEqual([]); + }); + + // Generates ErrorState objects with the correct response property, which is generated by calling generateErrorObject with the error property of each ErrorInfo object + it('should generate ErrorState objects with the correct response property', () => { + // Arrange + const errors: ErrorInfo[] = [ + { + error: new Error('Error 1'), + metadata: [generateMetadata(1)], + }, + { + error: new Error('Error 2'), + metadata: [generateMetadata(2), generateMetadata(3)], + }, + ]; + + const expectedErrorList: ErrorState[] = [ + { + metadata: generateMetadata(1), + response: generateErrorObject(new Error('Error 1')), + destination: generateDestination('destination1', 'Destination 1'), + }, + { + metadata: generateMetadata(2), + response: generateErrorObject(new Error('Error 2')), + destination: generateDestination('destination1', 'Destination 1'), + }, + { + metadata: generateMetadata(3), + response: generateErrorObject(new Error('Error 2')), + destination: generateDestination('destination1', 'Destination 1'), + }, + ]; + + // Act + const errorList = PluginAdapter.handleErrors( + errors, + generateDestination('destination1', 'Destination 1'), + ); + + // Assert + expect(errorList).toEqual(expectedErrorList); + }); + + // Generates ErrorState objects with the correct metadata property, which is the metadata property of each ErrorInfo object has duplicates + it('should generate ErrorState objects with the correct metadata property', () => { + // Arrange + const errors: ErrorInfo[] = [ + { + error: new Error('Error 1'), + metadata: [generateMetadata(1), generateMetadata(1)], + }, + { + error: new Error('Error 2'), + metadata: [generateMetadata(2)], + }, + ]; + + const expectedErrorList: ErrorState[] = [ + { + metadata: generateMetadata(1), + response: generateErrorObject(new Error('Error 1')), + destination: generateDestination('destination1', 'Destination 1'), + }, + { + metadata: generateMetadata(2), + response: generateErrorObject(new Error('Error 2')), + destination: generateDestination('destination1', 'Destination 1'), + }, + ]; + + // Act + const errorList = PluginAdapter.handleErrors( + errors, + generateDestination('destination1', 'Destination 1'), + ); + + // Assert + expect(errorList).toEqual(expectedErrorList); + }); +}); + +describe('handleProcSuccess', () => { + // Returns an array of ProcTransformedState objects when given a valid responseList and destination. + it('should return an array of ProcTransformedState objects when given a valid responseList and destination', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [ + { id: 1, name: 'John' }, + { id: 2, name: 'Jane' }, + ], + metadata: [generateMetadata(12), generateMetadata(34)], + }, + { + payload: [ + { id: 3, name: 'Alice' }, + { id: 4, name: 'Bob' }, + ], + metadata: [generateMetadata(56), generateMetadata(78)], + }, + ]; + + const destination: Destination = generateDestination('destination1', 'Destination 1'); + + const transformedPayloadList = PluginAdapter.handleProcSuccess(responseList, destination); + + expect(transformedPayloadList).toEqual([ + { + payload: { id: 1, name: 'John' }, + metadata: generateMetadata(12), + destination, + }, + { + payload: { id: 2, name: 'Jane' }, + metadata: generateMetadata(34), + destination, + }, + { + payload: { id: 3, name: 'Alice' }, + metadata: generateMetadata(56), + destination, + }, + { + payload: { id: 4, name: 'Bob' }, + metadata: generateMetadata(78), + destination, + }, + ]); + }); + + // Handles a TransformedPayloadInfo object with a single payload. + it('should return an array of ProcTransformedState objects when given a valid responseList and destination', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [ + { id: 1, name: 'John' }, + { id: 2, name: 'Jane' }, + ], + metadata: [generateMetadata(123), generateMetadata(456)], + }, + ]; + + const destination: Destination = generateDestination('destination1', 'Destination 1'); + + const transformedPayloadList = PluginAdapter.handleProcSuccess(responseList, destination); + + expect(transformedPayloadList).toEqual([ + { + payload: { id: 1, name: 'John' }, + metadata: generateMetadata(123), + destination, + }, + { + payload: { id: 2, name: 'Jane' }, + metadata: generateMetadata(456), + destination, + }, + ]); + }); + + // Handles multiplexed scenario + it('should handle multiplexed scenario', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [ + { id: 1, name: 'John' }, + { id: 2, name: 'Jane' }, + ], + metadata: [generateMetadata(1), generateMetadata(1)], + }, + ]; + + const destination: Destination = generateDestination('destination1', 'Destination 1'); + + const transformedPayloadList = PluginAdapter.handleProcSuccess(responseList, destination); + + // Should return an array of ProcTransformedState objects with the same metadata allowed in processor transformation + expect(transformedPayloadList).toEqual([ + { + payload: { id: 1, name: 'John' }, + metadata: generateMetadata(1), + destination, + }, + { + payload: { id: 2, name: 'Jane' }, + metadata: generateMetadata(1), + destination, + }, + ]); + }); +}); + +// Tests for transformAtProcessor describe('transformAtProcessor', () => { // Transforms input events to output events it('should transform input events to output events', async () => { @@ -646,6 +947,354 @@ describe('transformAtProcessor', () => { }); }); +describe('rankResponsesByUniqueJobIds', () => { + // Returns a list of objects containing the index of the responseList in the order of the number of unique jobIds in the metadata array + it('should return a list of objects with the correct index order based on the number of unique jobIds', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [{ value: 'somePayload1' }], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + }, + { + payload: [{ value: 'somePayload2' }], + metadata: [generateMetadata(4), generateMetadata(5)], + }, + { + payload: [{ value: 'somePayload3' }], + metadata: [generateMetadata(6), generateMetadata(7), generateMetadata(8)], + }, + ]; + + const uniqueJobRank = PluginAdapter.rankResponsesByUniqueJobIds(responseList); + + expect(uniqueJobRank).toEqual([ + { uniqueJobIds: 3, index: 0 }, + { uniqueJobIds: 3, index: 2 }, + { uniqueJobIds: 2, index: 1 }, + ]); + }); + + // Works correctly when all metadata arrays have unique jobIds + it('should return a list of objects with the correct index order when all metadata arrays have unique jobIds', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [1, 2, 3], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + }, + { + payload: [4, 5], + metadata: [generateMetadata(4), generateMetadata(5)], + }, + { + payload: [6, 7, 8], + metadata: [generateMetadata(6), generateMetadata(7), generateMetadata(8)], + }, + ]; + + const uniqueJobRank = PluginAdapter.rankResponsesByUniqueJobIds(responseList); + + expect(uniqueJobRank).toEqual([ + { uniqueJobIds: 3, index: 0 }, + { uniqueJobIds: 3, index: 2 }, + { uniqueJobIds: 2, index: 1 }, + ]); + }); + + // Works correctly when all metadata arrays have the same jobIds + it('should return a list of objects with the correct index order when all metadata arrays have the same jobIds', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [{ value: 'somePayload1' }], + metadata: [generateMetadata(1), generateMetadata(1), generateMetadata(1)], + }, + { + payload: [{ value: 'somePayload2' }], + metadata: [generateMetadata(4), generateMetadata(4)], + }, + { + payload: [{ value: 'somePayload3' }], + metadata: [generateMetadata(6), generateMetadata(6), generateMetadata(6)], + }, + ]; + + const uniqueJobRank = PluginAdapter.rankResponsesByUniqueJobIds(responseList); + + expect(uniqueJobRank).toEqual([ + { uniqueJobIds: 1, index: 0 }, + { uniqueJobIds: 1, index: 1 }, + { uniqueJobIds: 1, index: 2 }, + ]); + }); + + // Works correctly when responseList contains only one object with an empty metadata array + it('should return a list of objects with the correct index order when responseList contains only one object with an empty metadata array', () => { + const responseList: TransformedPayloadInfo[] = [ + { + payload: [{ value: 'somePayload' }], + metadata: [], + }, + ]; + + const uniqueJobRank = PluginAdapter.rankResponsesByUniqueJobIds(responseList); + + expect(uniqueJobRank).toEqual([{ uniqueJobIds: 0, index: 0 }]); + }); + + // Works correctly when responseList is empty + it('should return an empty array when responseList is empty', () => { + const responseList: TransformedPayloadInfo[] = []; + + const uniqueJobRank = PluginAdapter.rankResponsesByUniqueJobIds(responseList); + + expect(uniqueJobRank).toEqual([]); + }); +}); + +describe('createFinalResponse', () => { + // The method correctly creates a final response object from a list of transformed payload info objects, a destination, and a unique job rank array. + it('should create a final response object with correct payloads, metadata, and destination', () => { + const uniqueJobRank = [ + { uniqueJobIds: 3, index: 0 }, + { uniqueJobIds: 2, index: 1 }, + ]; + + const responseList: TransformedPayloadInfo[] = [ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + }, + { + payload: [ + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(1), generateMetadata(2)], + }, + ]; + + const destination = generateDestination('destination1', 'Destination 1'); + + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + expect(finalResponse).toEqual([ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + destination, + }, + ]); + }); + + // The method correctly handles an empty list of transformed payload info objects by returning an empty list. + it('should return an empty list when the response list is empty', () => { + const uniqueJobRank = []; + + const responseList: TransformedPayloadInfo[] = []; + + const destination = generateDestination('destination1', 'Destination 1'); + + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + expect(finalResponse).toEqual([]); + }); + + // The method correctly pushes metadata to the final response only if it is not already present. + it('should create a final response object with correct payloads, metadata, and destination', () => { + const uniqueJobRank = [ + { uniqueJobIds: 3, index: 0 }, + { uniqueJobIds: 2, index: 1 }, + ]; + + const responseList: TransformedPayloadInfo[] = [ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + }, + { + payload: [ + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(4), generateMetadata(5)], + }, + ]; + + const destination = generateDestination('destination1', 'Destination 1'); + + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + expect(finalResponse).toEqual([ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + destination, + }, + { + payload: [ + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(4), generateMetadata(5)], + + destination, + }, + ]); + }); + + // The method correctly appends the entire ranked response including all the payloads and metadata at the same position in the final response if the jobId is already present in the final response. + it('should create a final response object with correct payloads, metadata, and destination', () => { + const uniqueJobRank = [ + { uniqueJobIds: 3, index: 0 }, + { uniqueJobIds: 2, index: 1 }, + ]; + + const responseList = [ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + }, + { + payload: [ + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(1), generateMetadata(2)], + }, + ]; + + const destination = generateDestination('destination1', 'Destination 1'); + + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + expect(finalResponse).toEqual([ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + destination, + }, + ]); + }); + + // The method correctly handles a list of transformed payload info objects with empty payloads by not appending the corresponding metadata to the final response. + it('should not append metadata to the final response when the payload is empty', () => { + const uniqueJobRank = [ + { uniqueJobIds: 2, index: 0 }, + { uniqueJobIds: 1, index: 1 }, + ]; + + const responseList = [ + { + payload: [], + metadata: [generateMetadata(1), generateMetadata(2)], + }, + { + payload: [], + metadata: [generateMetadata(1)], + }, + ]; + + const destination = generateDestination('destination1', 'Destination 1'); + + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + expect(finalResponse).toEqual([ + { + payload: [], + metadata: [generateMetadata(1), generateMetadata(2)], + destination, + }, + ]); + }); + + // The method correctly handles a list of transformed payload info objects with duplicate metadata by deduplicating the metadata in the final response. + it('should create a final response object with correct payloads, metadata, and destination', () => { + const uniqueJobRank = [ + { uniqueJobIds: 3, index: 0 }, + { uniqueJobIds: 2, index: 1 }, + ]; + + const responseList = [ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + }, + { + payload: [ + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(2)], + }, + ]; + + const destination = generateDestination('destination1', 'Destination 1'); + + const finalResponse = PluginAdapter.createFinalResponse( + uniqueJobRank, + responseList, + destination, + ); + + expect(finalResponse).toEqual([ + { + payload: [ + { id: 1, name: 'payload1' }, + { id: 2, name: 'payload2' }, + { id: 3, name: 'payload3' }, + { id: 4, name: 'payload4' }, + ], + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + destination, + }, + ]); + }); +}); + describe('transformAtRouter', () => { // Transforms input events to output events it('should transform input events to output events', async () => { diff --git a/src/helpers/pluginAdapter.ts b/src/helpers/pluginAdapter.ts index 6fda28840c..e67a75fc36 100644 --- a/src/helpers/pluginAdapter.ts +++ b/src/helpers/pluginAdapter.ts @@ -21,36 +21,34 @@ import { import { generateErrorObject } from '../v0/util'; import { MappedToDestinationKey } from '../constants'; -type TransformedPayloadInfo = { +export type TransformedPayloadInfo = { payload: any[]; metadata: Metadata[]; - destination: Destination; }; -type ErrorInfo = { +export type ErrorInfo = { error: any; metadata: Metadata[]; - destination: Destination; }; -type ProcTransformedState = { +export type ProcTransformedState = { payload: TransformedOutput; metadata: Metadata; destination: Destination; }; -type RtTransformedState = { +export type RtTransformedState = { payload: TransformedOutput[]; metadata: Metadata[]; destination: Destination; }; -type ErrorState = { metadata: Metadata; response: any; destination: Destination }; +export type ErrorState = { metadata: Metadata; response: any; destination: Destination }; export class PluginAdapter { private static pluginCache: Map = new Map(); - static deduplicateMetadata(metadata: Metadata[]) { + static deduplicateMetadataByJobId(metadata: Metadata[]) { const jobIdMap = new Map(); const deduplicatedMetadata: Metadata[] = []; for (const meta of metadata) { @@ -78,6 +76,12 @@ export class PluginAdapter { static handleErrors(errors: ErrorInfo[], destination: Destination): ErrorState[] { const errorList: ErrorState[] = []; + // deduplicate error metadata for each error + errors.forEach((error) => { + // eslint-disable-next-line no-param-reassign + error.metadata = PluginAdapter.deduplicateMetadataByJobId(error.metadata); + }); + if (errors.length > 0) { const nestedErrorList = errors.map((e) => e.metadata.map((metadata) => ({ @@ -93,7 +97,6 @@ export class PluginAdapter { } // Proc Transform Related Functions - static handleProcSuccess( responseList: TransformedPayloadInfo[], destination: Destination, @@ -244,7 +247,7 @@ export class PluginAdapter { finalResponse.push({ payload: rankedResponse.payload.map((payload) => payload as TransformedOutput), // only stored deduplicated metadata in the final response - metadata: PluginAdapter.deduplicateMetadata(rankedResponse.metadata), + metadata: PluginAdapter.deduplicateMetadataByJobId(rankedResponse.metadata), destination, }); // update the jobIdPositionMap for all the jobIds in the rankedResponse