Skip to content

Commit

Permalink
feat: cm360 router batching (#2836)
Browse files Browse the repository at this point in the history
* fix: timestamp microseconds input cm360

* feat: batching in cm360

* chore: addressed comments

* chore: addressed comments

* fix: return aborted inside cath block

* feat: update batching partial events

* feat: update batching partial events

* feat: update batching partial events

* feat: update batching partial events

* chore: removed hardcoded response

* feat: added new error class

* chore: develop merge

* feat: handle metadata as obj

* feat: handle metadata as obj

* chore: removed commented code

* fix: merged develop

* fix: merged develop

* chore: added tests for batching

* chore: added tests for batching
  • Loading branch information
aashishmalik authored and anantjain45823 committed Nov 30, 2023
1 parent dac7613 commit 02b47fe
Show file tree
Hide file tree
Showing 9 changed files with 999 additions and 55 deletions.
17 changes: 16 additions & 1 deletion src/services/destination/postTransformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ import { ErrorReportingService } from '../errorReporting';
import tags from '../../v0/util/tags';
import stats from '../../util/stats';

type ErrorResponse = {
status?: number;
message?: string;
destinationResponse?: object;
statTags?: object;
authErrorCategory?: string | undefined;
response?: object | undefined;
};

export class DestinationPostTransformationService {
public static handleProcessorTransformSucessEvents(
event: ProcessorTransformationRequest,
Expand Down Expand Up @@ -139,7 +148,7 @@ export class DestinationPostTransformationService {
}

public static handleDeliveryFailureEvents(
error: NonNullable<unknown>,
error: ErrorResponse,
metaTo: MetaTransferObject,
): DeliveryResponse {
const errObj = generateErrorObject(error, metaTo.errorDetails, false);
Expand All @@ -152,6 +161,12 @@ export class DestinationPostTransformationService {
authErrorCategory: errObj.authErrorCategory,
}),
} as DeliveryResponse;

// for transformer-proxy to maintain contract
const { response } = error;
if (response) {
resp.response = response;
}
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type DeliveryResponse = {
destinationResponse: any;
statTags: object;
authErrorCategory?: string;
response?: object;
};

enum MessageType {
Expand Down
54 changes: 29 additions & 25 deletions src/v0/destinations/am/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,12 @@ const processSingleMessage = (message, destination) => {
const { name, event, properties } = message;
const messageType = message.type.toLowerCase();
const CATEGORY_KEY = 'properties.category';
const { useUserDefinedPageEventName, userProvidedPageEventString,
useUserDefinedScreenEventName, userProvidedScreenEventString } = destination.Config;
const {
useUserDefinedPageEventName,
userProvidedPageEventString,
useUserDefinedScreenEventName,
userProvidedScreenEventString,
} = destination.Config;
switch (messageType) {
case EventType.IDENTIFY:
payloadObjectName = 'events'; // identify same as events
Expand All @@ -602,17 +606,17 @@ const processSingleMessage = (message, destination) => {
case EventType.PAGE:
if (useUserDefinedPageEventName) {
const getMessagePath = userProvidedPageEventString
.substring(
userProvidedPageEventString.indexOf('{') + 2,
userProvidedPageEventString.indexOf('}'),
)
.trim();
.substring(
userProvidedPageEventString.indexOf('{') + 2,
userProvidedPageEventString.indexOf('}'),
)
.trim();
evType =
userProvidedPageEventString.trim() === ''
? name
: userProvidedPageEventString
.trim()
.replaceAll(/{{([^{}]+)}}/g, get(message, getMessagePath));
userProvidedPageEventString.trim() === ''
? name
: userProvidedPageEventString
.trim()
.replaceAll(/{{([^{}]+)}}/g, get(message, getMessagePath));
} else {
evType = `Viewed ${name || get(message, CATEGORY_KEY) || ''} Page`;
}
Expand All @@ -625,25 +629,25 @@ const processSingleMessage = (message, destination) => {
case EventType.SCREEN:
{
const { eventType, updatedProperties } = getScreenevTypeAndUpdatedProperties(
message,
CATEGORY_KEY,
message,
CATEGORY_KEY,
);
let customScreenEv = '';
if (useUserDefinedScreenEventName) {
const getMessagePath = userProvidedScreenEventString
.substring(
userProvidedScreenEventString.indexOf('{') + 2,
userProvidedScreenEventString.indexOf('}'),
)
.trim();
.substring(
userProvidedScreenEventString.indexOf('{') + 2,
userProvidedScreenEventString.indexOf('}'),
)
.trim();
customScreenEv =
userProvidedScreenEventString.trim() === ''
? name
: userProvidedScreenEventString
.trim()
.replaceAll(/{{([^{}]+)}}/g, get(message, getMessagePath));
userProvidedScreenEventString.trim() === ''
? name
: userProvidedScreenEventString
.trim()
.replaceAll(/{{([^{}]+)}}/g, get(message, getMessagePath));
}
evType =useUserDefinedScreenEventName ? customScreenEv : eventType;
evType = useUserDefinedScreenEventName ? customScreenEv : eventType;
message.properties = updatedProperties;
category = ConfigCategory.SCREEN;
}
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/campaign_manager/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const ConfigCategories = {
},
};

const MAX_BATCH_CONVERSATIONS_SIZE = 1000;

const EncryptionEntityType = [
'ENCRYPTION_ENTITY_TYPE_UNKNOWN',
'DCM_ACCOUNT',
Expand All @@ -28,4 +30,5 @@ module.exports = {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
};
35 changes: 29 additions & 6 deletions src/v0/destinations/campaign_manager/networkHandler.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-restricted-syntax */
const { AbortedError, RetryableError, NetworkError } = require('@rudderstack/integrations-lib');
const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../util/index');
Expand All @@ -9,22 +10,44 @@ const {
const tags = require('../../util/tags');

function checkIfFailuresAreRetryable(response) {
const { status } = response;
try {
if (Array.isArray(response.status) && Array.isArray(response.status[0].errors)) {
return (
response.status[0].errors[0].code !== 'PERMISSION_DENIED' &&
response.status[0].errors[0].code !== 'INVALID_ARGUMENT'
);
if (Array.isArray(status)) {
// iterate over each status, and if found retryable in conversations ..retry else discard
/* status : [{
"conversion": {
object (Conversion)
},
"errors": [
{
object (ConversionError)
}
],
"kind": string
}] */
for (const st of status) {
for (const err of st.errors) {
// if code is any of these, event is not retryable
if (
err.code === 'PERMISSION_DENIED' ||
err.code === 'INVALID_ARGUMENT' ||
err.code === 'NOT_FOUND'
) {
return false;
}
}
}
}
return true;
} catch (e) {
return true;
return false;
}
}

const responseHandler = (destinationResponse) => {
const message = `[CAMPAIGN_MANAGER Response Handler] - Request Processed Successfully`;
const { response, status } = destinationResponse;

if (isHttpStatusSuccess(status)) {
// check for Failures
if (response.hasFailures === true) {
Expand Down
113 changes: 109 additions & 4 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const lodash = require('lodash');
const { EventType } = require('../../../constants');

const {
constructPayload,
defaultRequestConfig,
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
simpleProcessRouterDest,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');

Expand All @@ -17,6 +20,7 @@ const {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
} = require('./config');

const { convertToMicroseconds } = require('./util');
Expand Down Expand Up @@ -178,9 +182,110 @@ function process(event) {
return response;
}

const generateBatch = (eventKind, events) => {
const batchRequestObject = defaultBatchRequestConfig();
const conversions = [];
let encryptionInfo = {};
const metadata = [];
// extracting destination, message from the first event in a batch
const { destination, message } = events[0];
// Batch event into dest batch structure
events.forEach((ev) => {
conversions.push(...ev.message.body.JSON.conversions);
metadata.push(ev.metadata);
if (ev.message.body.JSON.encryptionInfo) {
encryptionInfo = ev.message.body.JSON.encryptionInfo;
}
});

batchRequestObject.batchedRequest.body.JSON = {
kind: eventKind,
conversions,
};

if (Object.keys(encryptionInfo).length > 0) {
batchRequestObject.batchedRequest.body.JSON.encryptionInfo = encryptionInfo;
}

batchRequestObject.batchedRequest.endpoint = message.endpoint;

batchRequestObject.batchedRequest.headers = message.headers;

return {
...batchRequestObject,
metadata,
destination,
};
};

const batchEvents = (eventChunksArray) => {
const batchedResponseList = [];

// group batchInsert and batchUpdate payloads
const groupedEventChunks = lodash.groupBy(
eventChunksArray,
(event) => event.message.body.JSON.kind,
);
Object.keys(groupedEventChunks).forEach((eventKind) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(groupedEventChunks[eventKind], MAX_BATCH_CONVERSATIONS_SIZE);
eventChunks.forEach((chunk) => {
const batchEventResponse = generateBatch(eventKind, chunk);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
});
return batchedResponseList;
};

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
await Promise.all(
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventChunksArray.push({
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const proccessedRespList = process(event);
const transformedPayload = {
message: proccessedRespList,
metadata: event.metadata,
destination,
};
eventChunksArray.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
}),
);

let batchResponseList = [];
if (eventChunksArray.length > 0) {
batchResponseList = batchEvents(eventChunksArray);
}

return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { process, processRouterDest };
2 changes: 1 addition & 1 deletion test/__tests__/facebook_conversions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe(`${name} Tests`, () => {
});
});
});

describe("Router Tests", () => {
it("Payload", async () => {
const routerOutput = await transformer.processRouterDest(inputRouterData);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export const data = [
{
name: 'campaign_manager',
description: 'Sucess insert request',
description: 'Sucess insert request V0',
feature: 'dataDelivery',
module: 'destination',
version: 'v0',
Expand Down Expand Up @@ -135,14 +135,14 @@ export const data = [
},
output: {
response: {
status: 500,
status: 400,
body: {
output: {
status: 500,
message: 'Campaign Manager: Retrying during CAMPAIGN_MANAGER response transformation',
status: 400,
message: 'Campaign Manager: Aborting during CAMPAIGN_MANAGER response transformation',
statTags: {
errorCategory: 'network',
errorType: 'retryable',
errorType: 'aborted',
destType: 'CAMPAIGN_MANAGER',
module: 'destination',
implementation: 'native',
Expand Down
Loading

0 comments on commit 02b47fe

Please sign in to comment.