Skip to content

Commit

Permalink
fix: gaoc batching order
Browse files Browse the repository at this point in the history
  • Loading branch information
Gauravudia committed Feb 6, 2024
1 parent c7c3110 commit 49e2a1a
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
CALL_CONVERSION,
Expand Down Expand Up @@ -229,7 +230,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
.concat(storeSalesEventsBatchedResponseList)
.concat(clickCallEvents)
.concat(errorRespList);
return batchedResponseList;
return combineBatchRequestsWithSameJobIds(batchedResponseList);
};

module.exports = { process, processRouterDest };
2 changes: 1 addition & 1 deletion src/v0/destinations/mp/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const {
handleRtTfSingleEventError,
groupEventsByType,
parseConfigArray,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
ConfigCategory,
Expand All @@ -33,7 +34,6 @@ const {
createIdentifyResponse,
isImportAuthCredentialsAvailable,
buildUtmParams,
combineBatchRequestsWithSameJobIds,
groupEventsByEndpoint,
batchEvents,
trimTraits,
Expand Down
91 changes: 0 additions & 91 deletions src/v0/destinations/mp/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,44 +139,6 @@ const isImportAuthCredentialsAvailable = (destination) =>
destination.Config.serviceAccountUserName &&
destination.Config.projectId);

/**
* Finds an existing batch based on metadata JobIds from the provided batch and metadataMap.
* @param {*} batch
* @param {*} metadataMap The map containing metadata items indexed by JobIds.
* @returns
*/
const findExistingBatch = (batch, metadataMap) => {
let existingBatch = null;

// eslint-disable-next-line no-restricted-syntax
for (const metadataItem of batch.metadata) {
if (metadataMap.has(metadataItem.jobId)) {
existingBatch = metadataMap.get(metadataItem.jobId);
break;
}
}

return existingBatch;
};

/**
* Removes duplicate metadata within each merged batch object.
* @param {*} mergedBatches An array of merged batch objects.
*/
const removeDuplicateMetadata = (mergedBatches) => {
mergedBatches.forEach((batch) => {
const metadataSet = new Set();
// eslint-disable-next-line no-param-reassign
batch.metadata = batch.metadata.filter((metadataItem) => {
if (!metadataSet.has(metadataItem.jobId)) {
metadataSet.add(metadataItem.jobId);
return true;
}
return false;
});
});
};

/**
* Builds UTM parameters from a campaign object.
*
Expand Down Expand Up @@ -273,58 +235,6 @@ const batchEvents = (successRespList, maxBatchSize, reqMetadata) => {
});
};

/**
* Combines batched requests with the same JobIds.
* @param {*} inputBatches The array of batched request objects.
* @returns The combined batched requests with merged JobIds.
*
*/
const combineBatchRequestsWithSameJobIds = (inputBatches) => {
const combineBatches = (batches) => {
const clonedBatches = [...batches];
const mergedBatches = [];
const metadataMap = new Map();

clonedBatches.forEach((batch) => {
const existingBatch = findExistingBatch(batch, metadataMap);

if (existingBatch) {
// Merge batchedRequests arrays
existingBatch.batchedRequest = [
...(Array.isArray(existingBatch.batchedRequest)
? existingBatch.batchedRequest
: [existingBatch.batchedRequest]),
...(Array.isArray(batch.batchedRequest) ? batch.batchedRequest : [batch.batchedRequest]),
];

// Merge metadata
batch.metadata.forEach((metadataItem) => {
if (!metadataMap.has(metadataItem.jobId)) {
metadataMap.set(metadataItem.jobId, existingBatch);
}
existingBatch.metadata.push(metadataItem);
});
} else {
mergedBatches.push(batch);
batch.metadata.forEach((metadataItem) => {
metadataMap.set(metadataItem.jobId, batch);
});
}
});

// Remove duplicate metadata within each merged object
removeDuplicateMetadata(mergedBatches);

return mergedBatches;
};
// We need to run this twice because in first pass some batches might not get merged
// and in second pass they might get merged
// Example: [[{jobID:1}, {jobID:2}], [{jobID:3}], [{jobID:1}, {jobID:3}]]
// 1st pass: [[{jobID:1}, {jobID:2}, {jobID:3}], [{jobID:3}]]
// 2nd pass: [[{jobID:1}, {jobID:2}, {jobID:3}]]
return combineBatches(combineBatches(inputBatches));
};

/**
* Trims the traits and contextTraits objects based on the setOnceProperties array and returns an object containing the modified traits, contextTraits, and setOnce properties.
*
Expand Down Expand Up @@ -398,6 +308,5 @@ module.exports = {
groupEventsByEndpoint,
generateBatchedPayloadForArray,
batchEvents,
combineBatchRequestsWithSameJobIds,
trimTraits,
};
230 changes: 0 additions & 230 deletions src/v0/destinations/mp/util.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const {
combineBatchRequestsWithSameJobIds,
groupEventsByEndpoint,
batchEvents,
generateBatchedPayloadForArray,
Expand Down Expand Up @@ -263,235 +262,6 @@ describe('Mixpanel utils test', () => {
});
});

describe('Unit test cases for combineBatchRequestsWithSameJobIds', () => {
it('Combine batch request with same jobIds', async () => {
const input = [
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/track/',
},
metadata: [
{
jobId: 1,
},
{
jobId: 4,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/import/',
},
metadata: [
{
jobId: 3,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/track/',
},
metadata: [
{
jobId: 5,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/engage/',
},
metadata: [
{
jobId: 1,
},
{
jobId: 3,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/import/',
},
metadata: [
{
jobId: 6,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
];

const expectedOutput = [
{
batchedRequest: [
{
endpoint: 'https://api.mixpanel.com/track/',
},
{
endpoint: 'https://api.mixpanel.com/engage/',
},
{
endpoint: 'https://api.mixpanel.com/import/',
},
],
metadata: [
{
jobId: 1,
},
{
jobId: 4,
},
{
jobId: 3,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/track/',
},
metadata: [
{
jobId: 5,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/import/',
},
metadata: [
{
jobId: 6,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
];
expect(combineBatchRequestsWithSameJobIds(input)).toEqual(expectedOutput);
});

it('Each batchRequest contains unique jobIds (no event multiplexing)', async () => {
const input = [
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/track/',
},
metadata: [
{
jobId: 1,
},
{
jobId: 4,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/engage/',
},
metadata: [
{
jobId: 2,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/engage/',
},
metadata: [
{
jobId: 5,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
];

const expectedOutput = [
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/track/',
},

metadata: [
{
jobId: 1,
},
{
jobId: 4,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/engage/',
},
metadata: [
{
jobId: 2,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
{
batchedRequest: {
endpoint: 'https://api.mixpanel.com/engage/',
},
metadata: [
{
jobId: 5,
},
],
batched: true,
statusCode: 200,
destination: destinationMock,
},
];
expect(combineBatchRequestsWithSameJobIds(input)).toEqual(expectedOutput);
});
});

describe('Unit test cases for generateBatchedPayloadForArray', () => {
it('should generate a batched payload with GZIP payload for /import endpoint when given an array of events', () => {
const events = [
Expand Down
Loading

0 comments on commit 49e2a1a

Please sign in to comment.