diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 4db1856535..d2f68aed28 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -9,6 +9,7 @@ const { } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); +const { getGroupedEvents } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -102,38 +103,49 @@ const processRouterDest = (inputs) => { if (errorRespEvents.length > 0) { return errorRespEvents; } - - const eventsChunk = []; // temporary variable to divide payload into chunks - const errorRespList = []; - - inputs.forEach((event) => { - try { - if (event.message.statusCode) { - // already transformed event - eventsChunk.push(event); - } else { - // if not transformed - let response = process(event); - response = Array.isArray(response) ? response : [response]; - response.forEach((res) => { - eventsChunk.push({ - message: res, - metadata: event.metadata, - destination: event.destination, - }); + const groupedEvents = getGroupedEvents(inputs); + // eslint-disable-next-line sonarjs/no-unused-collection + const finalResp = []; + console.log('groupedEvents', JSON.stringify(groupedEvents)); + groupedEvents.forEach((eventList) => { + if (eventList.length > 0) { + eventList.forEach((ev) => { + const eventsChunk = []; // temporary variable to divide payload into chunks + const errorRespList = []; + ev.forEach((event) => { + try { + if (event.message.statusCode) { + // already transformed event + eventsChunk.push(event); + } else { + // if not transformed + let response = process(event); + response = Array.isArray(response) ? response : [response]; + response.forEach((res) => { + eventsChunk.push({ + message: res, + metadata: event.metadata, + destination: event.destination, + }); + }); + } + } catch (error) { + const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION); + errorRespList.push(errRespEvent); + } }); - } - } catch (error) { - const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION); - errorRespList.push(errRespEvent); + let batchedResponseList = []; + if (eventsChunk.length > 0) { + batchedResponseList = batchEvents(eventsChunk); + } + finalResp.push([...batchedResponseList, ...errorRespList]); + }); } - }); - - let batchedResponseList = []; - if (eventsChunk.length > 0) { - batchedResponseList = batchEvents(eventsChunk); - } - return [...batchedResponseList, ...errorRespList]; + + + }); + const allBatchedEvents =_.sortBy(finalResp.flat(), ['metadata.job_id']); + return allBatchedEvents; }; module.exports = { process, processRouterDest }; diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index cc3aba78b2..91eaafc833 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,4 +1,5 @@ /* eslint-disable no-param-reassign */ +const _ = require('lodash'); const getValue = require('get-value'); const { getDynamicErrorType, @@ -143,4 +144,67 @@ function networkHandler() { this.processAxiosResponse = processAxiosResponse; } -module.exports = { networkHandler }; +function splitArray(arr, delimiter) { + const result = []; + let subarray = []; + + for (const item of arr) { + if (item === delimiter) { + if (subarray.length > 0) { + result.push([...subarray]); + } + subarray = []; + } else { + subarray.push(item); + } + } + + if (subarray.length > 0) { + result.push([...subarray]); + } + + return result; +} + +const filterAndSplitEvents = (eachEventTypeArray) => { + const delimiter = 'track'; + let delimiterArray = []; + const resultArray = [] + for (const item of eachEventTypeArray) { + if (item.message.type === delimiter) { + delimiterArray.push(item); + } else { + if(delimiterArray.length > 0) { + resultArray.push(delimiterArray); + delimiterArray = []; + } + resultArray.push([item]); + } + } + // Push any remaining delimiterArray + if (delimiterArray.length > 0) { + resultArray.push(delimiterArray); + } + return resultArray; +}; + + +/** + * Groups and orders events based on userId and job_id. + * + * @param {Array} inputs - An array of objects representing events, where each object has a `metadata` property containing `userId` and `job_id`. + * @returns {Array} - An array of events grouped by `userId` and ordered by `job_id`. Each element in the array represents a group of events with the same `userId`. + */ +const getGroupedEvents = (inputs) => { + const typeBasedOrderedEvents = []; + const userIdEventMap = _.groupBy(inputs, 'metadata.userId'); + const eventGroupedByUserId = Object.values(userIdEventMap); + eventGroupedByUserId.forEach((eachUserJourney) => { + const eachEventTypeArray = filterAndSplitEvents(eachUserJourney); + typeBasedOrderedEvents.push(eachEventTypeArray); + }); + const flattenedArray = typeBasedOrderedEvents.flat(); + return flattenedArray; // u1 : [identify, track], u2: [identify, track] +} + +module.exports = { networkHandler, getGroupedEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js new file mode 100644 index 0000000000..c5de31609f --- /dev/null +++ b/src/v0/destinations/bqstream/util.test.js @@ -0,0 +1,62 @@ +// Generated by CodiumAI +const { getGroupedEvents } = require('./util'); +describe('getGroupedEvents', () => { + + // // Tests that the function returns an empty array when inputs is empty + it('should return an empty array when inputs is empty', () => { + const inputs = []; + const result = getGroupedEvents(inputs); + expect(result).toEqual([]); + }); + + // // Tests that the function returns an array with one element when inputs has only one event + it('should return an array with one element when inputs has only one event', () => { + const inputs = [{ metadata: { userId: '1', job_id: '123' } }]; + const result = getGroupedEvents(inputs); + expect(result).toEqual([[{ metadata: { userId: '1', job_id: '123' } }]]); + }); + + // // Tests that the function returns an array with one element when inputs has multiple events but all have the same userId + it('should return an array with one element when inputs has multiple events with the same userId', () => { + const inputs = [ + { metadata: { userId: '1', job_id: '123' } }, + { metadata: { userId: '1', job_id: '456' } }, + { metadata: { userId: '1', job_id: '789' } } + ]; + const result = getGroupedEvents(inputs); + expect(result).toEqual([[{ metadata: { userId: '1', job_id: '123' } }, { metadata: { userId: '1', job_id: '456' } }, { metadata: { userId: '1', job_id: '789' } }]]); + }); + + // Tests that the function returns an array with multiple elements when inputs has multiple events with different userIds + it('should return an array with multiple elements when inputs has multiple events with different userIds', () => { + const inputs = [ + { metadata: { userId: '1', job_id: '123' } }, + { metadata: { userId: '2', job_id: '456' } }, + { metadata: { userId: '3', job_id: '789' } } + ]; + const result = getGroupedEvents(inputs); + console.log(JSON.stringify(result)); + expect(result).toEqual([ + [{ metadata: { userId: '1', job_id: '123' } }], + [{ metadata: { userId: '2', job_id: '456' } }], + [{ metadata: { userId: '3', job_id: '789' } }] + ]); + }); + + // Tests that the function returns an array with multiple elements when inputs has multiple events with same userIds and some have the different job_id + it('should return an array with multiple elements when inputs has multiple events with same userIds and some have the different job_id', () => { + const inputs = [ + { metadata: { userId: '1', job_id: '123' } }, + { metadata: { userId: '2', job_id: '456' } }, + { metadata: { userId: '3', job_id: '789' } }, + { metadata: { userId: '1', job_id: '981' } } + ]; + const result = getGroupedEvents(inputs); + expect(result).toEqual([ + [{ metadata: { userId: '1', job_id: '123' } }, + { metadata: { userId: '1', job_id: '981' } }], + [{ metadata: { userId: '2', job_id: '456' } }], + [{ metadata: { userId: '3', job_id: '789' } }] + ]); + }); +}); diff --git a/test/integrations/destinations/bqstream/router/data.ts b/test/integrations/destinations/bqstream/router/data.ts index 0f6e663ad0..e746e0f8a4 100644 --- a/test/integrations/destinations/bqstream/router/data.ts +++ b/test/integrations/destinations/bqstream/router/data.ts @@ -73,6 +73,7 @@ export const data = [ }, metadata: { jobId: 1, + userId: 'user12345', }, destination: { Config: { @@ -153,6 +154,7 @@ export const data = [ }, metadata: { jobId: 2, + userId: 'user12345', }, destination: { Config: { @@ -203,9 +205,11 @@ export const data = [ metadata: [ { jobId: 1, + userId: 'user12345', }, { jobId: 2, + userId: 'user12345', }, ], batched: true,