Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cm360 router batching #2836

Merged
merged 30 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f44a105
fix: timestamp microseconds input cm360
aashishmalik Sep 25, 2023
3771b3d
feat: batching in cm360
aashishmalik Sep 25, 2023
3e589e7
chore: addressed comments
aashishmalik Sep 26, 2023
44f1048
chore: addressed comments
aashishmalik Sep 26, 2023
0cf895e
fix: return aborted inside cath block
aashishmalik Sep 26, 2023
d7ca5c7
Merge branch 'develop' into fix.cm360-timestamp
aashishmalik Sep 26, 2023
88eef81
Merge branch 'fix.cm360-timestamp' of https://github.com/rudderlabs/r…
aashishmalik Oct 9, 2023
eed94e7
feat: update batching partial events
aashishmalik Oct 9, 2023
b2719e7
feat: update batching partial events
aashishmalik Oct 9, 2023
8eba95b
feat: update batching partial events
aashishmalik Oct 9, 2023
be9aa33
feat: update batching partial events
aashishmalik Oct 9, 2023
ddbbfa4
chore: removed hardcoded response
aashishmalik Oct 13, 2023
18431cb
feat: added new error class
aashishmalik Oct 17, 2023
0c21942
chore: merge conflict with develop
aashishmalik Oct 18, 2023
7679fcc
chore: develop merge
aashishmalik Oct 18, 2023
a984ad9
feat: handle metadata as obj
aashishmalik Oct 19, 2023
c09d83d
feat: handle metadata as obj
aashishmalik Oct 19, 2023
9b7416f
chore: removed commented code
aashishmalik Oct 19, 2023
f878e00
Merge branch 'main' of https://github.com/rudderlabs/rudder-transform…
aashishmalik Oct 20, 2023
c0953f9
Merge branch 'develop' of https://github.com/rudderlabs/rudder-transf…
aashishmalik Oct 20, 2023
8952559
Merge branch 'feat.transformerProxy-newContract' of https://github.co…
aashishmalik Oct 20, 2023
3b6e35a
chore: conflicts resolved
aashishmalik Nov 19, 2023
6059777
Merge branch 'develop' into feat.cm360-proxyV1
aashishmalik Nov 21, 2023
1b7ba32
fix: merged develop
aashishmalik Nov 21, 2023
7559d7b
Merge branch 'develop' into feat.cm360-proxyV1
aashishmalik Nov 27, 2023
209c8af
Merge branch 'develop' into feat.cm360-proxyV1
aashishmalik Nov 30, 2023
756ca99
fix: merged develop
aashishmalik Nov 30, 2023
68bcea7
chore: added tests for batching
aashishmalik Nov 30, 2023
db59918
chore: added tests for batching
aashishmalik Nov 30, 2023
d126f95
Merge branch 'develop' into feat.cm360-proxyV1
aashishmalik Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/services/destination/postTransformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ import { ErrorReportingService } from '../errorReporting';
import tags from '../../v0/util/tags';
import stats from '../../util/stats';

type ErrorResponse = {
status?: number;
message?: string;
destinationResponse?: object;
statTags?: object;
authErrorCategory?: string | undefined;
response?: object | undefined;
};

export class DestinationPostTransformationService {
public static handleProcessorTransformSucessEvents(
event: ProcessorTransformationRequest,
Expand Down Expand Up @@ -139,7 +148,7 @@ export class DestinationPostTransformationService {
}

public static handleDeliveryFailureEvents(
error: NonNullable<unknown>,
error: ErrorResponse,
metaTo: MetaTransferObject,
): DeliveryResponse {
const errObj = generateErrorObject(error, metaTo.errorDetails, false);
Expand All @@ -152,6 +161,12 @@ export class DestinationPostTransformationService {
authErrorCategory: errObj.authErrorCategory,
}),
} as DeliveryResponse;

// for transformer-proxy to maintain contract
const { response } = error;
if (response) {
resp.response = response;
}
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type DeliveryResponse = {
destinationResponse: any;
statTags: object;
authErrorCategory?: string;
response?: object;
};

enum MessageType {
Expand Down
2 changes: 1 addition & 1 deletion src/v0/destinations/am/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ const processSingleMessage = (message, destination) => {
.trim()
.replaceAll(/{{([^{}]+)}}/g, get(message, getMessagePath));
}
evType =useUserDefinedScreenEventName ? customScreenEv : eventType;
evType = customScreenEv || eventType;
message.properties = updatedProperties;
category = ConfigCategory.SCREEN;
}
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/campaign_manager/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const ConfigCategories = {
},
};

const MAX_BATCH_CONVERSATIONS_SIZE = 1000;

const EncryptionEntityType = [
'ENCRYPTION_ENTITY_TYPE_UNKNOWN',
'DCM_ACCOUNT',
Expand All @@ -28,4 +30,5 @@ module.exports = {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
};
137 changes: 129 additions & 8 deletions src/v0/destinations/campaign_manager/networkHandler.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,150 @@
const { AbortedError, RetryableError, NetworkError } = require('@rudderstack/integrations-lib');
/* eslint-disable no-param-reassign */
/* eslint-disable no-restricted-syntax */
const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../util/index');

const {
processAxiosResponse,
getDynamicErrorType,
} = require('../../../adapters/utils/networkUtils');
const { TransformerProxyError } = require('../../util/errorTypes');
const tags = require('../../util/tags');

function checkIfFailuresAreRetryable(response) {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
function checkIfFailuresAreRetryable(response, proxyOutputObj) {
const { status } = response;
try {
if (Array.isArray(response.status) && Array.isArray(response.status[0].errors)) {
return (
response.status[0].errors[0].code !== 'PERMISSION_DENIED' &&
response.status[0].errors[0].code !== 'INVALID_ARGUMENT'
);
if (Array.isArray(status)) {
// iterate over each status, and if found retryable in conversations ..retry else discard
/* status : [{
"conversion": {
object (Conversion)
},
"errors": [
{
object (ConversionError)
}
],
"kind": string
}] */
for (const st of status) {
for(const err of st.errors) {
// if code is any of these, event is not retryable
if (
err.code === 'PERMISSION_DENIED' ||
err.code === 'INVALID_ARGUMENT' ||
err.code === 'NOT_FOUND'
) {
return false;
}
}
}
}
return true;
} catch (e) {
return true;
return false;
}
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
}

function isEventRetryable(element, proxyOutputObj) {
let flag = false;
let errorMsg = "";
// success event
if (!element.errors) {
return flag;
}
for(const err of element.errors) {
errorMsg += `${err.message}, `;
if (err.code === 'INTERNAL') {
flag = true;
}
}
if (errorMsg) {
proxyOutputObj.error = errorMsg;
}
return flag;
}

function isEventAbortable(element, proxyOutputObj) {
let flag = false;
let errorMsg = "";
// success event
if (!element.errors) {
return flag;
}
for(const err of element.errors) {
errorMsg += `${err.message}, `;
// if code is any of these, event is not retryable
if (
err.code === 'PERMISSION_DENIED' ||
err.code === 'INVALID_ARGUMENT' ||
err.code === 'NOT_FOUND'
) {
flag = true;
}
}
if (errorMsg) {
proxyOutputObj.error = errorMsg;
}
return flag;
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
}

const responseHandler = (destinationResponse) => {
const message = `[CAMPAIGN_MANAGER Response Handler] - Request Processed Successfully`;
const { response, status } = destinationResponse;
const responseWithIndividualEvents = [];
const { response, status, rudderJobMetadata } = destinationResponse;

if (Array.isArray(rudderJobMetadata)) {
if (isHttpStatusSuccess(status)) {
// check for Partial Event failures and Successes
const destPartialStatus = response.status;

for (const [idx, element] of destPartialStatus.entries()) {
const proxyOutputObj = {
statusCode: 200,
metadata: rudderJobMetadata[idx],
error: "success"
};
// update status of partial event as per retriable or abortable
if (isEventRetryable(element, proxyOutputObj)) {
proxyOutputObj.statusCode = 500;
} else if (isEventAbortable(element, proxyOutputObj)) {
proxyOutputObj.statusCode = 400;
}
responseWithIndividualEvents.push(proxyOutputObj);
}

return {
status,
message,
destinationResponse,
response: responseWithIndividualEvents
}
}

// in case of failure status, populate response to maintain len(metadata)=len(response)
const errorMessage = response.error?.message || 'unknown error format';
for (const metadata of rudderJobMetadata) {
responseWithIndividualEvents.push({
statusCode: 500,
metadata,
error: errorMessage
});
}

throw new TransformerProxyError(
`Campaign Manager: Error proxy during CAMPAIGN_MANAGER response transformation`,
500,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status),
},
destinationResponse,
getAuthErrCategoryFromStCode(status),
responseWithIndividualEvents
);
}

if (isHttpStatusSuccess(status)) {
// check for Failures
if (response.hasFailures === true) {
Expand Down Expand Up @@ -60,6 +180,7 @@ const responseHandler = (destinationResponse) => {
destinationResponse,
getAuthErrCategoryFromStCode(status),
);

};

function networkHandler() {
Expand Down
113 changes: 109 additions & 4 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const lodash = require('lodash');
const { EventType } = require('../../../constants');

const {
constructPayload,
defaultRequestConfig,
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
simpleProcessRouterDest,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');

Expand All @@ -17,6 +20,7 @@ const {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
} = require('./config');

const { convertToMicroseconds } = require('./util');
Expand Down Expand Up @@ -178,9 +182,110 @@ function process(event) {
return response;
}

const generateBatch = (eventKind, events) => {
const batchRequestObject = defaultBatchRequestConfig();
const conversions = [];
let encryptionInfo = {};
const metadata = [];
// extracting destination, message from the first event in a batch
const { destination, message } = events[0];
// Batch event into dest batch structure
events.forEach((ev) => {
conversions.push(...ev.message.body.JSON.conversions);
metadata.push(ev.metadata);
if (ev.message.body.JSON.encryptionInfo) {
encryptionInfo = ev.message.body.JSON.encryptionInfo;
}
});

batchRequestObject.batchedRequest.body.JSON = {
kind: eventKind,
conversions,
};

if (Object.keys(encryptionInfo).length > 0) {
batchRequestObject.batchedRequest.body.JSON.encryptionInfo = encryptionInfo;
}

batchRequestObject.batchedRequest.endpoint = message.endpoint;

batchRequestObject.batchedRequest.headers = message.headers;

return {
...batchRequestObject,
metadata,
destination,
};
};

const batchEvents = (eventChunksArray) => {
const batchedResponseList = [];

// group batchInsert and batchUpdate payloads
const groupedEventChunks = lodash.groupBy(
eventChunksArray,
(event) => event.message.body.JSON.kind,
);
Object.keys(groupedEventChunks).forEach((eventKind) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(groupedEventChunks[eventKind], MAX_BATCH_CONVERSATIONS_SIZE);
eventChunks.forEach((chunk) => {
const batchEventResponse = generateBatch(eventKind, chunk);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
});
return batchedResponseList;
};

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
await Promise.all(
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventChunksArray.push({
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const proccessedRespList = process(event);
const transformedPayload = {
message: proccessedRespList,
metadata: event.metadata,
destination,
};
eventChunksArray.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
}),
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
);
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved

let batchResponseList = [];
if (eventChunksArray.length > 0) {
batchResponseList = batchEvents(eventChunksArray);
}

return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { process, processRouterDest };
2 changes: 2 additions & 0 deletions src/v0/util/errorTypes/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const TransformerProxyError = require('./transformerProxyError');
const FilteredEventsError = require('./filteredEventsError');

module.exports = {
TransformerProxyError,
FilteredEventsError,
};
Loading
Loading