Skip to content

Commit

Permalink
chore: intorduce batching
Browse files Browse the repository at this point in the history
  • Loading branch information
anantjain45823 committed Jun 28, 2024
1 parent 37e3099 commit 1e7f479
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 22 deletions.
68 changes: 46 additions & 22 deletions src/v0/destinations/dyi/transform.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
/* eslint-disable no-nested-ternary */
const get = require('get-value');
const set = require('set-value');
import { JsonTemplateEngine } from 'https://cdn.jsdelivr.net/npm/@rudderstack/[email protected]/build/json-template.min.js';
const { ConfigurationError } = require('@rudderstack/integrations-lib');
const { ConfigurationError, InstrumentationError } = require('@rudderstack/integrations-lib');
const {
defaultPostRequestConfig,
defaultPutRequestConfig,
defaultPatchRequestConfig,
defaultGetRequestConfig,
defaultRequestConfig,
getFieldValueFromMessage,
flattenJson,
isDefinedAndNotNull,
getHashFromArray,
simpleProcessRouterDest,
applyCustomMappings
} = require('../../util');

const { groupEvents } = require('./utils')
const { EventType } = require('../../../constants');

const buildRequestPayload = (payload, method, headers, params) => {
Expand Down Expand Up @@ -62,20 +52,23 @@ const trackResponseBuilder = (message, destination) => {
}
]
*/
const eventRequest = track.filter((key) => {
return message.event === key.trackEventName;
});
eventRequest.forEach(request => {
const eventRequest = track.filter((key) => {
return message.event === key.trackEventName;
});
eventRequest.forEach(request => {
const { trackEndpoint, trackMethod, trackHeaders, trackQueryParams, trackPathVariables, trackParameterMapping } = request;
const headers = getHashFromArray(trackHeaders);
const params = getHashFromArray(trackQueryParams);
const pathVariables = getHashFromArray(trackPathVariables);
const endpoint = trackEndpoint.replace(/{(\w+)}/g, (_, key) => {
if (!pathVariables[key]) {
throw new Error(`Key ${key} not found in the pathVariables`);
throw new InstrumentationError(`Key ${key} not found in the pathVariables`);
}
return pathVariables;
});
if (endpoint.length === 0) {
throw new ConfigurationError('Endpoint is missing');
}
const payload = applyCustomMappings(message, trackParameterMapping);
respList.push(buildRequestPayload(endpoint, payload, trackMethod, headers, params))
});
Expand Down Expand Up @@ -144,14 +137,45 @@ const processEvent = (event) => {
}
};
const process = (event) => {
const response = processEvent({ ...event, DESTINATION });
const response = processEvent(event);
return response;
};

const processRouterDest = async (inputs, reqMetadata) => {
const destNameRichInputs = inputs.map((input) => ({ ...input, DESTINATION }));
const respList = simpleProcessRouterDest(destNameRichInputs, processEvent, reqMetadata);
return respList;
const processRouterDest = (inputs, reqMetadata) => {
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
const { destination } = inputs[0];
inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
successRespList.push({
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const transformedPayload = {
message: process(event),
metadata: event.metadata,
destination,
};
successRespList.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
});
if (successRespList.length > 0) {
const { destination } = inputs[0];
const { enableBatching, batchMaxSize } = destination;
batchResponseList = enableBatching && batchMaxSize > 1 ? groupEvents(successRespList, batchMaxSize) : successRespList;

}
return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { processEvent, process, processRouterDest };
90 changes: 90 additions & 0 deletions src/v0/destinations/dyi/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
const crypto = require('crypto');
const {
getSuccessRespEvents,
defaultBatchRequestConfig,
} = require('../../util');
/**
* This fucntion calculates hash of incoming event
* @param {*} event
* @returns
* Example event : {
endpoint: 'https://www.abc.com/{userId}/{anonId}',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer token'
},
params: { userId: '123', anonId: '456' },
method: 'GET'
};
*/
const getHash = (event) => {
const { endpoint, headers, params, method } = event;

const data = {
endpoint,
headers,
params,
method
};
const dataString = JSON.stringify(data);
const hash = crypto.createHash('sha256').update(dataString).digest('hex');
return hash;
};

const generateBatchedPayloadForArray = (events) => {
let batchEventResponse = defaultBatchRequestConfig();
const metadata = [];
// extracting destination from the first event in a batch
const { message, destination } = events[0];

const jsonArray = [];
// Batch event into destination batch structure
events.forEach((event) => {
jsonArray.push(event.message.body.JSON);
metadata.push(event.metadata);
});
batchEventResponse.batchedRequest.endpoint = message.endpoint;
batchEventResponse.batchedRequest.method = message.method;
batchEventResponse.batchedRequest.headers = message.headers;
batchEventResponse.batchedRequest.params = message.params;
batchEventResponse.batchedRequest.body.JSON = jsonArray;
batchEventResponse = {
...batchEventResponse,
metadata,
destination,
};
return batchEventResponse;
};
const batchEvents = (groupedEvents, maxBatchSize) => {
// batching and chunking logic
Object.keys(groupedEvents).forEach(group => {
const eventChunks = lodash.chunk(groupedEvents[group], maxBatchSize);
eventChunks.forEach((chunk) => {
const batchEventResponse = generateBatchedPayloadForArray(chunk, combination);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
});
}
const groupEvents = (batch, maxBatchSize) => {
const groupedEvents = {};
let batchEvents = [];
// grouping events
batch.forEach(event => {
const eventHash = getHash(event);
if (!groupedEvents[eventHash]) {
groupedEvents[eventHash] = [event];
} else {
groupedEvents[eventHash].push(event);
}
});
return batchEvents(groupedEvents, maxBatchSize);
};

module.exports = { groupEvents };

0 comments on commit 1e7f479

Please sign in to comment.