Skip to content

Commit

Permalink
fix: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Sep 6, 2023
1 parent 9df4885 commit a119c4b
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 31 deletions.
72 changes: 42 additions & 30 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { getGroupedEvents } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -102,38 +103,49 @@ const processRouterDest = (inputs) => {
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];

inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
const groupedEvents = getGroupedEvents(inputs);
// eslint-disable-next-line sonarjs/no-unused-collection
const finalResp = [];
console.log('groupedEvents', JSON.stringify(groupedEvents));
groupedEvents.forEach((eventList) => {
if (eventList.length > 0) {
eventList.forEach((ev) => {
const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];
ev.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
}
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
finalResp.push([...batchedResponseList, ...errorRespList]);
});
}
});

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
return [...batchedResponseList, ...errorRespList];


});
const allBatchedEvents =_.sortBy(finalResp.flat(), ['metadata.job_id']);
return allBatchedEvents;
};

module.exports = { process, processRouterDest };
66 changes: 65 additions & 1 deletion src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable no-param-reassign */
const _ = require('lodash');
const getValue = require('get-value');
const {
getDynamicErrorType,
Expand Down Expand Up @@ -143,4 +144,67 @@ function networkHandler() {
this.processAxiosResponse = processAxiosResponse;
}

module.exports = { networkHandler };
function splitArray(arr, delimiter) {
const result = [];
let subarray = [];

for (const item of arr) {
if (item === delimiter) {
if (subarray.length > 0) {
result.push([...subarray]);
}
subarray = [];
} else {
subarray.push(item);
}
}

if (subarray.length > 0) {
result.push([...subarray]);
}

return result;
}

const filterAndSplitEvents = (eachEventTypeArray) => {
const delimiter = 'track';
let delimiterArray = [];
const resultArray = []
for (const item of eachEventTypeArray) {
if (item.message.type === delimiter) {
delimiterArray.push(item);
} else {
if(delimiterArray.length > 0) {
resultArray.push(delimiterArray);
delimiterArray = [];
}
resultArray.push([item]);
}
}
// Push any remaining delimiterArray
if (delimiterArray.length > 0) {
resultArray.push(delimiterArray);
}
return resultArray;
};


/**
* Groups and orders events based on userId and job_id.
*
* @param {Array} inputs - An array of objects representing events, where each object has a `metadata` property containing `userId` and `job_id`.
* @returns {Array} - An array of events grouped by `userId` and ordered by `job_id`. Each element in the array represents a group of events with the same `userId`.
*/
const getGroupedEvents = (inputs) => {
const typeBasedOrderedEvents = [];
const userIdEventMap = _.groupBy(inputs, 'metadata.userId');
const eventGroupedByUserId = Object.values(userIdEventMap);
eventGroupedByUserId.forEach((eachUserJourney) => {
const eachEventTypeArray = filterAndSplitEvents(eachUserJourney);
typeBasedOrderedEvents.push(eachEventTypeArray);
});
const flattenedArray = typeBasedOrderedEvents.flat();
return flattenedArray; // u1 : [identify, track], u2: [identify, track]
}

module.exports = { networkHandler, getGroupedEvents };
62 changes: 62 additions & 0 deletions src/v0/destinations/bqstream/util.test.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions test/integrations/destinations/bqstream/router/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export const data = [
},
metadata: {
jobId: 1,
userId: 'user12345',
},
destination: {
Config: {
Expand Down Expand Up @@ -153,6 +154,7 @@ export const data = [
},
metadata: {
jobId: 2,
userId: 'user12345',
},
destination: {
Config: {
Expand Down Expand Up @@ -203,9 +205,11 @@ export const data = [
metadata: [
{
jobId: 1,
userId: 'user12345',
},
{
jobId: 2,
userId: 'user12345',
},
],
batched: true,
Expand Down

0 comments on commit a119c4b

Please sign in to comment.