diff --git a/src/services/destination/cdkV2Integration.ts b/src/services/destination/cdkV2Integration.ts index a91bc5674b..0789a98c1e 100644 --- a/src/services/destination/cdkV2Integration.ts +++ b/src/services/destination/cdkV2Integration.ts @@ -1,7 +1,6 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable class-methods-use-this */ import { TransformationError } from '@rudderstack/integrations-lib'; -import groupBy from 'lodash/groupBy'; import { processCdkV2Workflow } from '../../cdk/v2/handler'; import { DestinationService } from '../../interfaces/DestinationService'; import { @@ -20,6 +19,7 @@ import { } from '../../types/index'; import stats from '../../util/stats'; import { CatchErr } from '../../util/types'; +import { groupRouterTransformEvents } from '../../v0/util'; import tags from '../../v0/util/tags'; import { DestinationPostTransformationService } from './postTransformation'; @@ -112,46 +112,38 @@ export class CDKV2DestinationService implements DestinationService { _version: string, requestMetadata: NonNullable, ): Promise { - const allDestEvents: object = groupBy( - events, - (ev: RouterTransformationRequestData) => ev.destination?.ID, - ); + const groupedEvents: RouterTransformationRequestData[][] = groupRouterTransformEvents(events); const response: RouterTransformationResponse[][] = await Promise.all( - Object.values(allDestEvents).map( - async (destInputArray: RouterTransformationRequestData[]) => { - const metaTo = this.getTags( - destinationType, - destInputArray[0].metadata.destinationId, - destInputArray[0].metadata.workspaceId, - tags.FEATURES.ROUTER, - ); - metaTo.metadata = destInputArray[0].metadata; - try { - const doRouterTransformationResponse: RouterTransformationResponse[] = - await processCdkV2Workflow( - destinationType, - destInputArray, - tags.FEATURES.ROUTER, - requestMetadata, - ); - return DestinationPostTransformationService.handleRouterTransformSuccessEvents( - doRouterTransformationResponse, - undefined, - metaTo, - tags.IMPLEMENTATIONS.CDK_V2, - destinationType.toUpperCase(), + groupedEvents.map(async (destInputArray: RouterTransformationRequestData[]) => { + const metaTo = this.getTags( + destinationType, + destInputArray[0].metadata.destinationId, + destInputArray[0].metadata.workspaceId, + tags.FEATURES.ROUTER, + ); + metaTo.metadata = destInputArray[0].metadata; + try { + const doRouterTransformationResponse: RouterTransformationResponse[] = + await processCdkV2Workflow( + destinationType, + destInputArray, + tags.FEATURES.ROUTER, + requestMetadata, ); - } catch (error: CatchErr) { - metaTo.metadatas = destInputArray.map((input) => input.metadata); - const erroredResp = - DestinationPostTransformationService.handleRouterTransformFailureEvents( - error, - metaTo, - ); - return [erroredResp]; - } - }, - ), + return DestinationPostTransformationService.handleRouterTransformSuccessEvents( + doRouterTransformationResponse, + undefined, + metaTo, + tags.IMPLEMENTATIONS.CDK_V2, + destinationType.toUpperCase(), + ); + } catch (error: CatchErr) { + metaTo.metadatas = destInputArray.map((input) => input.metadata); + const erroredResp = + DestinationPostTransformationService.handleRouterTransformFailureEvents(error, metaTo); + return [erroredResp]; + } + }), ); return response.flat(); } diff --git a/src/services/destination/nativeIntegration.ts b/src/services/destination/nativeIntegration.ts index 38a27ea71d..38ec934ff7 100644 --- a/src/services/destination/nativeIntegration.ts +++ b/src/services/destination/nativeIntegration.ts @@ -26,6 +26,7 @@ import { import stats from '../../util/stats'; import tags from '../../v0/util/tags'; import { DestinationPostTransformationService } from './postTransformation'; +import { groupRouterTransformEvents } from '../../v0/util'; export class NativeIntegrationDestinationService implements DestinationService { public init() {} @@ -99,11 +100,7 @@ export class NativeIntegrationDestinationService implements DestinationService { requestMetadata: NonNullable, ): Promise { const destHandler = FetchHandler.getDestHandler(destinationType, version); - const allDestEvents: NonNullable = groupBy( - events, - (ev: RouterTransformationRequestData) => ev.destination?.ID, - ); - const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents); + const groupedEvents: RouterTransformationRequestData[][] = groupRouterTransformEvents(events); const response: RouterTransformationResponse[][] = await Promise.all( groupedEvents.map(async (destInputArray: RouterTransformationRequestData[]) => { const metaTO = this.getTags( diff --git a/src/v0/util/index.js b/src/v0/util/index.js index c3b9570bc4..6eb6312589 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -2281,8 +2281,14 @@ const validateEventAndLowerCaseConversion = (event, isMandatory, convertToLowerC return convertToLowerCase ? event.toString().toLowerCase() : event.toString(); }; -const applyCustomMappings = (message, mappings) => - JsonTemplateEngine.createAsSync(mappings, { defaultPathType: PathType.JSON }).evaluate(message); +/** + * This function applies custom mappings to the event. + * @param {*} event The event to be transformed. + * @param {*} mappings The custom mappings to be applied. + * @returns {object} The transformed event. + */ +const applyCustomMappings = (event, mappings) => + JsonTemplateEngine.createAsSync(mappings, { defaultPathType: PathType.JSON }).evaluate(event); const applyJSONStringTemplate = (message, template) => JsonTemplateEngine.createAsSync(template.replace(/{{/g, '${').replace(/}}/g, '}'), { @@ -2290,6 +2296,17 @@ const applyJSONStringTemplate = (message, template) => }).evaluate(message); /** + * This groups the events by destination ID and source ID. + * Note: sourceID is only used for rETL events. + * @param {*} events The events to be grouped. + * @returns {array} The array of grouped events. + */ +const groupRouterTransformEvents = (events) => + Object.values( + lodash.groupBy(events, (ev) => [ev.destination?.ID, ev.context?.sources?.job_id || 'default']), + ); + +/* * Gets url path omitting the hostname & protocol * * **Note**: @@ -2364,6 +2381,7 @@ module.exports = { getValueFromMessage, getValueFromPropertiesOrTraits, getValuesAsArrayFromConfig, + groupRouterTransformEvents, handleSourceKeysOperation, hashToSha256, isAppleFamily, diff --git a/src/v0/util/index.test.js b/src/v0/util/index.test.js index 1b99e5ebb7..6bf689eca7 100644 --- a/src/v0/util/index.test.js +++ b/src/v0/util/index.test.js @@ -1,4 +1,4 @@ -const { TAG_NAMES, InstrumentationError } = require('@rudderstack/integrations-lib'); +const { InstrumentationError } = require('@rudderstack/integrations-lib'); const utilities = require('.'); const { getFuncTestData } = require('../../../test/testHelper'); const { FilteredEventsError } = require('./errorTypes'); @@ -8,6 +8,7 @@ const { generateExclusionList, combineBatchRequestsWithSameJobIds, validateEventAndLowerCaseConversion, + groupRouterTransformEvents, isAxiosError, } = require('./index'); const exp = require('constants'); @@ -693,6 +694,114 @@ describe('extractCustomFields', () => { }); }); +describe('groupRouterTransformEvents', () => { + it('should group events by destination.ID and context.sources.job_id', () => { + const events = [ + { + destination: { ID: 'dest1' }, + context: { sources: { job_id: 'job1' } }, + }, + { + destination: { ID: 'dest1' }, + context: { sources: { job_id: 'job2' } }, + }, + { + destination: { ID: 'dest2' }, + context: { sources: { job_id: 'job1' } }, + }, + ]; + const result = groupRouterTransformEvents(events); + + expect(result.length).toBe(3); // 3 unique groups + expect(result).toEqual([ + [{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }], + [{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job2' } } }], + [{ destination: { ID: 'dest2' }, context: { sources: { job_id: 'job1' } } }], + ]); + }); + + it('should group events by default job_id if context.sources.job_id is missing', () => { + const events = [ + { + destination: { ID: 'dest1' }, + context: { sources: {} }, + }, + { + destination: { ID: 'dest1' }, + context: { sources: { job_id: 'job1' } }, + }, + ]; + const result = groupRouterTransformEvents(events); + + expect(result.length).toBe(2); // 2 unique groups + expect(result).toEqual([ + [{ destination: { ID: 'dest1' }, context: { sources: {} } }], + [{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }], + ]); + }); + + it('should group events by default job_id if context or context.sources is missing', () => { + const events = [ + { + destination: { ID: 'dest1' }, + }, + { + destination: { ID: 'dest1' }, + context: { sources: { job_id: 'job1' } }, + }, + ]; + const result = groupRouterTransformEvents(events); + + expect(result.length).toBe(2); // 2 unique groups + expect(result).toEqual([ + [{ destination: { ID: 'dest1' } }], + [{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }], + ]); + }); + + it('should use "default" when destination.ID is missing', () => { + const events = [ + { + context: { sources: { job_id: 'job1' } }, + }, + { + destination: { ID: 'dest1' }, + context: { sources: { job_id: 'job1' } }, + }, + ]; + const result = groupRouterTransformEvents(events); + + expect(result.length).toBe(2); // 2 unique groups + expect(result).toEqual([ + [{ context: { sources: { job_id: 'job1' } } }], + [{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }], + ]); + }); + + it('should return an empty array when there are no events', () => { + const events = []; + const result = groupRouterTransformEvents(events); + + expect(result).toEqual([]); + }); + + it('should handle events with completely missing context and destination', () => { + const events = [ + {}, + { destination: { ID: 'dest1' } }, + { context: { sources: { job_id: 'job1' } } }, + ]; + const result = groupRouterTransformEvents(events); + + expect(result.length).toBe(3); // 3 unique groups + expect(result).toEqual([ + [{}], + [{ destination: { ID: 'dest1' } }], + [{ context: { sources: { job_id: 'job1' } } }], + ]); + }); +}); + describe('applyJSONStringTemplate', () => { it('should apply JSON string template to the payload', () => { const payload = {