Skip to content

Commit

Permalink
fix: the event ordering while batching
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Sep 8, 2023
1 parent 2d4223a commit 261aea2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 43 deletions.
29 changes: 17 additions & 12 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,11 @@ const processRouterDest = (inputs) => {
const groupedEvents = getGroupedEvents(inputs);
// eslint-disable-next-line sonarjs/no-unused-collection
const finalResp = [];
console.log('groupedEvents', JSON.stringify(groupedEvents));
groupedEvents.forEach((eventList) => {
let eventsChunk = []; // temporary variable to divide payload into chunks
let errorRespList = [];
if (eventList.length > 0) {
eventList.forEach((ev) => {
const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];
ev.forEach((event) => {
eventList.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
Expand All @@ -131,18 +129,25 @@ const processRouterDest = (inputs) => {
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
// divide the successful payloads till now into batches
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
// clear up the temporary variable
eventsChunk = [];
errorRespList.push(errRespEvent);
finalResp.push([...batchedResponseList, ...errorRespList]);
// putting it back as an empty array
errorRespList = [];
}
});
let batchedResponseList = [];
});
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
finalResp.push([...batchedResponseList, ...errorRespList]);
});
finalResp.push([...batchedResponseList]);
}
}


});
const allBatchedEvents =_.sortBy(finalResp.flat(), ['metadata.job_id']);
return allBatchedEvents;
Expand Down
47 changes: 16 additions & 31 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,33 +144,18 @@ function networkHandler() {
this.processAxiosResponse = processAxiosResponse;
}

function splitArray(arr, delimiter) {
const result = [];
let subarray = [];

for (const item of arr) {
if (item === delimiter) {
if (subarray.length > 0) {
result.push([...subarray]);
}
subarray = [];
} else {
subarray.push(item);
}
}

if (subarray.length > 0) {
result.push([...subarray]);
}

return result;
}

const filterAndSplitEvents = (eachEventTypeArray) => {
/**
* Splits an array of events into subarrays of track event lists.
* If any other event type is encountered, it is kept as a separate subarray.
*
* @param {Array} eachUserJourney - An array of events. eg. [track, track,identify,identify,track, track]
* @returns {Array} - An array of subarrays. eg [[track, track],[identify],[identify],[track, track]]
*/
const filterAndSplitEvents = (eachUserJourney) => {
const delimiter = 'track';
let delimiterArray = [];
const resultArray = []
for (const item of eachEventTypeArray) {
for (const item of eachUserJourney) {
if (item.message.type === delimiter) {
delimiterArray.push(item);
} else {
Expand All @@ -185,15 +170,16 @@ const filterAndSplitEvents = (eachEventTypeArray) => {
if (delimiterArray.length > 0) {
resultArray.push(delimiterArray);
}
return resultArray;
};
return resultArray.flat();
};


/**
* Groups and orders events based on userId and job_id.
* Groups the input events based on the `userId` property and filters and splits the events based on a delimiter.
*
* @param {Array} inputs - An array of objects representing events, where each object has a `metadata` property containing `userId` and `job_id`.
* @returns {Array} - An array of events grouped by `userId` and ordered by `job_id`. Each element in the array represents a group of events with the same `userId`.
* @param {Array} inputs - An array of objects representing events with `metadata.userId` and `message.type` properties.
* @returns {Array} An array of arrays containing the grouped and filtered events.
* Each inner array represents a user journey and contains the filtered events.
*/
const getGroupedEvents = (inputs) => {
const typeBasedOrderedEvents = [];
Expand All @@ -203,8 +189,7 @@ const getGroupedEvents = (inputs) => {
const eachEventTypeArray = filterAndSplitEvents(eachUserJourney);
typeBasedOrderedEvents.push(eachEventTypeArray);
});
const flattenedArray = typeBasedOrderedEvents.flat();
return flattenedArray; // u1 : [identify, track], u2: [identify, track]
return typeBasedOrderedEvents;
}

module.exports = { networkHandler, getGroupedEvents };

0 comments on commit 261aea2

Please sign in to comment.