Skip to content

Commit

Permalink
Merge branch 'develop' into fix.bugsnag-fb
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepdsvs authored Feb 13, 2024
2 parents b1b9d1a + fcc33cb commit 8a9ffaf
Show file tree
Hide file tree
Showing 11 changed files with 523 additions and 509 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ dist

# Others
**/.DS_Store

.dccache

.idea

Expand Down
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"@ndhoule/extend": "^2.0.0",
"@pyroscope/nodejs": "^0.2.6",
"@rudderstack/integrations-lib": "^0.2.2",
"@rudderstack/workflow-engine": "^0.6.9",
"@rudderstack/workflow-engine": "^0.7.2",
"ajv": "^8.12.0",
"ajv-draft-04": "^1.0.0",
"ajv-formats": "^2.1.1",
Expand Down
15 changes: 11 additions & 4 deletions src/cdk/v2/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,20 @@ export async function getWorkflowEngine(

const workflowEnginePromiseMap = new Map();

export function getCachedWorkflowEngine(
export async function getCachedWorkflowEngine(
destName: string,
feature: string,
bindings: Record<string, unknown> = {},
): WorkflowEngine {
): Promise<WorkflowEngine> {
// Create a new instance of the engine for the destination if needed
// TODO: Use cache to avoid long living engine objects
workflowEnginePromiseMap[destName] = workflowEnginePromiseMap[destName] || new Map();
if (!workflowEnginePromiseMap[destName][feature]) {
workflowEnginePromiseMap[destName][feature] = getWorkflowEngine(destName, feature, bindings);
workflowEnginePromiseMap[destName][feature] = await getWorkflowEngine(
destName,
feature,
bindings,
);
}
return workflowEnginePromiseMap[destName][feature];
}
Expand Down Expand Up @@ -97,5 +101,8 @@ export function executeStep(
): Promise<StepOutput> {
return workflowEngine
.getStepExecutor(stepName)
.execute(input, Object.assign(workflowEngine.bindings, getEmptyExecutionBindings(), bindings));
.execute(
input,
Object.assign(workflowEngine.getBindings(), getEmptyExecutionBindings(), bindings),
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
CALL_CONVERSION,
Expand Down Expand Up @@ -229,7 +230,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
.concat(storeSalesEventsBatchedResponseList)
.concat(clickCallEvents)
.concat(errorRespList);
return batchedResponseList;
return combineBatchRequestsWithSameJobIds(batchedResponseList);
};

module.exports = { process, processRouterDest };
2 changes: 1 addition & 1 deletion src/v0/destinations/mp/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const {
handleRtTfSingleEventError,
groupEventsByType,
parseConfigArray,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
ConfigCategory,
Expand All @@ -33,7 +34,6 @@ const {
createIdentifyResponse,
isImportAuthCredentialsAvailable,
buildUtmParams,
combineBatchRequestsWithSameJobIds,
groupEventsByEndpoint,
batchEvents,
trimTraits,
Expand Down
91 changes: 0 additions & 91 deletions src/v0/destinations/mp/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,44 +139,6 @@ const isImportAuthCredentialsAvailable = (destination) =>
destination.Config.serviceAccountUserName &&
destination.Config.projectId);

/**
* Finds an existing batch based on metadata JobIds from the provided batch and metadataMap.
* @param {*} batch
* @param {*} metadataMap The map containing metadata items indexed by JobIds.
* @returns
*/
const findExistingBatch = (batch, metadataMap) => {
let existingBatch = null;

// eslint-disable-next-line no-restricted-syntax
for (const metadataItem of batch.metadata) {
if (metadataMap.has(metadataItem.jobId)) {
existingBatch = metadataMap.get(metadataItem.jobId);
break;
}
}

return existingBatch;
};

/**
* Removes duplicate metadata within each merged batch object.
* @param {*} mergedBatches An array of merged batch objects.
*/
const removeDuplicateMetadata = (mergedBatches) => {
mergedBatches.forEach((batch) => {
const metadataSet = new Set();
// eslint-disable-next-line no-param-reassign
batch.metadata = batch.metadata.filter((metadataItem) => {
if (!metadataSet.has(metadataItem.jobId)) {
metadataSet.add(metadataItem.jobId);
return true;
}
return false;
});
});
};

/**
* Builds UTM parameters from a campaign object.
*
Expand Down Expand Up @@ -273,58 +235,6 @@ const batchEvents = (successRespList, maxBatchSize, reqMetadata) => {
});
};

/**
* Combines batched requests with the same JobIds.
* @param {*} inputBatches The array of batched request objects.
* @returns The combined batched requests with merged JobIds.
*
*/
const combineBatchRequestsWithSameJobIds = (inputBatches) => {
const combineBatches = (batches) => {
const clonedBatches = [...batches];
const mergedBatches = [];
const metadataMap = new Map();

clonedBatches.forEach((batch) => {
const existingBatch = findExistingBatch(batch, metadataMap);

if (existingBatch) {
// Merge batchedRequests arrays
existingBatch.batchedRequest = [
...(Array.isArray(existingBatch.batchedRequest)
? existingBatch.batchedRequest
: [existingBatch.batchedRequest]),
...(Array.isArray(batch.batchedRequest) ? batch.batchedRequest : [batch.batchedRequest]),
];

// Merge metadata
batch.metadata.forEach((metadataItem) => {
if (!metadataMap.has(metadataItem.jobId)) {
metadataMap.set(metadataItem.jobId, existingBatch);
}
existingBatch.metadata.push(metadataItem);
});
} else {
mergedBatches.push(batch);
batch.metadata.forEach((metadataItem) => {
metadataMap.set(metadataItem.jobId, batch);
});
}
});

// Remove duplicate metadata within each merged object
removeDuplicateMetadata(mergedBatches);

return mergedBatches;
};
// We need to run this twice because in first pass some batches might not get merged
// and in second pass they might get merged
// Example: [[{jobID:1}, {jobID:2}], [{jobID:3}], [{jobID:1}, {jobID:3}]]
// 1st pass: [[{jobID:1}, {jobID:2}, {jobID:3}], [{jobID:3}]]
// 2nd pass: [[{jobID:1}, {jobID:2}, {jobID:3}]]
return combineBatches(combineBatches(inputBatches));
};

/**
* Trims the traits and contextTraits objects based on the setOnceProperties array and returns an object containing the modified traits, contextTraits, and setOnce properties.
*
Expand Down Expand Up @@ -398,6 +308,5 @@ module.exports = {
groupEventsByEndpoint,
generateBatchedPayloadForArray,
batchEvents,
combineBatchRequestsWithSameJobIds,
trimTraits,
};
Loading

0 comments on commit 8a9ffaf

Please sign in to comment.