Skip to content

Commit

Permalink
fix: code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Sep 8, 2023
1 parent 261aea2 commit 28a2933
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 1,144 deletions.
72 changes: 35 additions & 37 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,53 +104,51 @@ const processRouterDest = (inputs) => {
return errorRespEvents;
}
const groupedEvents = getGroupedEvents(inputs);
// eslint-disable-next-line sonarjs/no-unused-collection
const finalResp = [];
groupedEvents.forEach((eventList) => {
groupedEvents.forEach((eventList) => {
let eventsChunk = []; // temporary variable to divide payload into chunks
let errorRespList = [];
if (eventList.length > 0) {
eventList.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);

Check warning on line 116 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L116

Added line #L116 was not covered by tests
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
// divide the successful payloads till now into batches
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
// clear up the temporary variable
eventsChunk = [];
errorRespList.push(errRespEvent);
finalResp.push([...batchedResponseList, ...errorRespList]);
// putting it back as an empty array
errorRespList = [];
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);

Check warning on line 130 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L130

Added line #L130 was not covered by tests
// divide the successful payloads till now into batches
let batchedResponseList = [];

Check warning on line 132 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L132

Added line #L132 was not covered by tests
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);

Check warning on line 134 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L134

Added line #L134 was not covered by tests
}
// clear up the temporary variable
eventsChunk = [];
errorRespList.push(errRespEvent);
finalResp.push([...batchedResponseList, ...errorRespList]);

Check warning on line 139 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L137-L139

Added lines #L137 - L139 were not covered by tests
// putting it back as an empty array
errorRespList = [];

Check warning on line 141 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L141

Added line #L141 was not covered by tests
}
});
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
finalResp.push([...batchedResponseList]);
}
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
finalResp.push([...batchedResponseList]);
}
}
});
const allBatchedEvents =_.sortBy(finalResp.flat(), ['metadata.job_id']);
return allBatchedEvents;
});
return finalResp.flat();
};

module.exports = { process, processRouterDest };
49 changes: 7 additions & 42 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,51 +145,16 @@ function networkHandler() {
}

/**
* Splits an array of events into subarrays of track event lists.
* If any other event type is encountered, it is kept as a separate subarray.
*
* @param {Array} eachUserJourney - An array of events. eg. [track, track,identify,identify,track, track]
* @returns {Array} - An array of subarrays. eg [[track, track],[identify],[identify],[track, track]]
*/
const filterAndSplitEvents = (eachUserJourney) => {
const delimiter = 'track';
let delimiterArray = [];
const resultArray = []
for (const item of eachUserJourney) {
if (item.message.type === delimiter) {
delimiterArray.push(item);
} else {
if(delimiterArray.length > 0) {
resultArray.push(delimiterArray);
delimiterArray = [];
}
resultArray.push([item]);
}
}
// Push any remaining delimiterArray
if (delimiterArray.length > 0) {
resultArray.push(delimiterArray);
}
return resultArray.flat();
};


/**
* Groups the input events based on the `userId` property and filters and splits the events based on a delimiter.
*
* @param {Array} inputs - An array of objects representing events with `metadata.userId` and `message.type` properties.
* @returns {Array} An array of arrays containing the grouped and filtered events.
* Each inner array represents a user journey and contains the filtered events.
* Groups the input events based on the `userId` property
*
* @param {Array} inputs - An array of objects representing events with `metadata.userId` property.
* @returns {Array} An array of arrays containing the grouped events.
* Each inner array represents a user journey.
*/
const getGroupedEvents = (inputs) => {
const typeBasedOrderedEvents = [];
const userIdEventMap = _.groupBy(inputs, 'metadata.userId');
const eventGroupedByUserId = Object.values(userIdEventMap);
eventGroupedByUserId.forEach((eachUserJourney) => {
const eachEventTypeArray = filterAndSplitEvents(eachUserJourney);
typeBasedOrderedEvents.push(eachEventTypeArray);
});
return typeBasedOrderedEvents;
}
return eventGroupedByUserId;
};

module.exports = { networkHandler, getGroupedEvents };
62 changes: 0 additions & 62 deletions src/v0/destinations/bqstream/util.test.js

This file was deleted.

Loading

0 comments on commit 28a2933

Please sign in to comment.