Skip to content

Commit

Permalink
chore: conflicts resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
aashishmalik committed Nov 19, 2023
2 parents e5f8714 + 8952559 commit 3b6e35a
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 19 deletions.
21 changes: 18 additions & 3 deletions src/services/destination/postTransformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ import { ErrorReportingService } from '../errorReporting';
import tags from '../../v0/util/tags';
import stats from '../../util/stats';

export class DestinationPostTransformationService {
type ErrorResponse = {
status?: number;
message?: string;
destinationResponse?: object;
statTags?: object;
authErrorCategory?: string | undefined;
response?: object | undefined;
};

export default class DestinationPostTransformationService {
public static handleProcessorTransformSucessEvents(
event: ProcessorTransformationRequest,
transformedPayloads: ProcessorTransformationOutput | ProcessorTransformationOutput[],
Expand Down Expand Up @@ -139,7 +148,7 @@ export class DestinationPostTransformationService {
}

public static handleDeliveryFailureEvents(
error: NonNullable<unknown>,
error: ErrorResponse,
metaTo: MetaTransferObject,
): DeliveryResponse {
const errObj = generateErrorObject(error, metaTo.errorDetails, false);
Expand All @@ -150,8 +159,14 @@ export class DestinationPostTransformationService {
statTags: errObj.statTags,
...(errObj.authErrorCategory && {
authErrorCategory: errObj.authErrorCategory,
}),
})
} as DeliveryResponse;

// for transformer-proxy to maintain contract
const { response } = error;
if (response) {
resp.response = response;
}
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type DeliveryResponse = {
destinationResponse: any;
statTags: object;
authErrorCategory?: string;
response?: object;
};

enum MessageType {
Expand Down
2 changes: 1 addition & 1 deletion src/v0/destinations/am/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ const processSingleMessage = (message, destination) => {
.trim()
.replaceAll(/{{([^{}]+)}}/g, get(message, getMessagePath));
}
evType =useUserDefinedScreenEventName ? customScreenEv : eventType;
evType = customScreenEv || eventType;
message.properties = updatedProperties;
category = ConfigCategory.SCREEN;
}
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/campaign_manager/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const ConfigCategories = {
},
};

const MAX_BATCH_CONVERSATIONS_SIZE = 1000;

const EncryptionEntityType = [
'ENCRYPTION_ENTITY_TYPE_UNKNOWN',
'DCM_ACCOUNT',
Expand All @@ -28,4 +30,5 @@ module.exports = {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
};
137 changes: 129 additions & 8 deletions src/v0/destinations/campaign_manager/networkHandler.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,150 @@
const { AbortedError, RetryableError, NetworkError } = require('@rudderstack/integrations-lib');
/* eslint-disable no-param-reassign */
/* eslint-disable no-restricted-syntax */
const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../util/index');

const {
processAxiosResponse,
getDynamicErrorType,
} = require('../../../adapters/utils/networkUtils');
const { TransformerProxyError } = require('../../util/errorTypes');
const tags = require('../../util/tags');

function checkIfFailuresAreRetryable(response) {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
function checkIfFailuresAreRetryable(response, proxyOutputObj) {
const { status } = response;
try {
if (Array.isArray(response.status) && Array.isArray(response.status[0].errors)) {
return (
response.status[0].errors[0].code !== 'PERMISSION_DENIED' &&
response.status[0].errors[0].code !== 'INVALID_ARGUMENT'
);
if (Array.isArray(status)) {
// iterate over each status, and if found retryable in conversations ..retry else discard
/* status : [{
"conversion": {
object (Conversion)
},
"errors": [
{
object (ConversionError)
}
],
"kind": string
}] */
for (const st of status) {
for(const err of st.errors) {
// if code is any of these, event is not retryable
if (
err.code === 'PERMISSION_DENIED' ||
err.code === 'INVALID_ARGUMENT' ||
err.code === 'NOT_FOUND'
) {
return false;
}
}
}
}
return true;
} catch (e) {
return true;
return false;
}
}

function isEventRetryable(element, proxyOutputObj) {
let flag = false;
let errorMsg = "";
// success event
if (!element.errors) {
return flag;
}
for(const err of element.errors) {
errorMsg += `${err.message}, `;
if (err.code === 'INTERNAL') {
flag = true;
}
}
if (errorMsg) {
proxyOutputObj.error = errorMsg;
}
return flag;
}

function isEventAbortable(element, proxyOutputObj) {
let flag = false;
let errorMsg = "";
// success event
if (!element.errors) {
return flag;
}
for(const err of element.errors) {
errorMsg += `${err.message}, `;
// if code is any of these, event is not retryable
if (
err.code === 'PERMISSION_DENIED' ||
err.code === 'INVALID_ARGUMENT' ||
err.code === 'NOT_FOUND'
) {
flag = true;
}
}
if (errorMsg) {
proxyOutputObj.error = errorMsg;
}
return flag;
}

const responseHandler = (destinationResponse) => {
const message = `[CAMPAIGN_MANAGER Response Handler] - Request Processed Successfully`;
const { response, status } = destinationResponse;
const responseWithIndividualEvents = [];
const { response, status, rudderJobMetadata } = destinationResponse;

if (Array.isArray(rudderJobMetadata)) {
if (isHttpStatusSuccess(status)) {
// check for Partial Event failures and Successes
const destPartialStatus = response.status;

for (const [idx, element] of destPartialStatus.entries()) {
const proxyOutputObj = {
statusCode: 200,
metadata: rudderJobMetadata[idx],
error: "success"
};
// update status of partial event as per retriable or abortable
if (isEventRetryable(element, proxyOutputObj)) {
proxyOutputObj.statusCode = 500;
} else if (isEventAbortable(element, proxyOutputObj)) {
proxyOutputObj.statusCode = 400;
}
responseWithIndividualEvents.push(proxyOutputObj);
}

return {
status,
message,
destinationResponse,
response: responseWithIndividualEvents
}
}

// in case of failure status, populate response to maintain len(metadata)=len(response)
const errorMessage = response.error?.message || 'unknown error format';
for (const metadata of rudderJobMetadata) {
responseWithIndividualEvents.push({
statusCode: 500,
metadata,
error: errorMessage
});
}

throw new TransformerProxyError(
`Campaign Manager: Error proxy during CAMPAIGN_MANAGER response transformation`,
500,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status),
},
destinationResponse,
getAuthErrCategoryFromStCode(status),
responseWithIndividualEvents
);
}

if (isHttpStatusSuccess(status)) {
// check for Failures
if (response.hasFailures === true) {
Expand Down Expand Up @@ -60,6 +180,7 @@ const responseHandler = (destinationResponse) => {
destinationResponse,
getAuthErrCategoryFromStCode(status),
);

};

function networkHandler() {
Expand Down
113 changes: 109 additions & 4 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const lodash = require('lodash');
const { EventType } = require('../../../constants');

const {
constructPayload,
defaultRequestConfig,
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
simpleProcessRouterDest,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');

Expand All @@ -17,6 +20,7 @@ const {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
} = require('./config');

const { convertToMicroseconds } = require('./util');
Expand Down Expand Up @@ -178,9 +182,110 @@ function process(event) {
return response;
}

const generateBatch = (eventKind, events) => {
const batchRequestObject = defaultBatchRequestConfig();
const conversions = [];
let encryptionInfo = {};
const metadata = [];
// extracting destination, message from the first event in a batch
const { destination, message } = events[0];
// Batch event into dest batch structure
events.forEach((ev) => {
conversions.push(...ev.message.body.JSON.conversions);
metadata.push(ev.metadata);
if (ev.message.body.JSON.encryptionInfo) {
encryptionInfo = ev.message.body.JSON.encryptionInfo;
}
});

batchRequestObject.batchedRequest.body.JSON = {
kind: eventKind,
conversions,
};

if (Object.keys(encryptionInfo).length > 0) {
batchRequestObject.batchedRequest.body.JSON.encryptionInfo = encryptionInfo;
}

batchRequestObject.batchedRequest.endpoint = message.endpoint;

batchRequestObject.batchedRequest.headers = message.headers;

return {
...batchRequestObject,
metadata,
destination,
};
};

const batchEvents = (eventChunksArray) => {
const batchedResponseList = [];

// group batchInsert and batchUpdate payloads
const groupedEventChunks = lodash.groupBy(
eventChunksArray,
(event) => event.message.body.JSON.kind,
);
Object.keys(groupedEventChunks).forEach((eventKind) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(groupedEventChunks[eventKind], MAX_BATCH_CONVERSATIONS_SIZE);
eventChunks.forEach((chunk) => {
const batchEventResponse = generateBatch(eventKind, chunk);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
});
return batchedResponseList;
};

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
await Promise.all(
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventChunksArray.push({
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const proccessedRespList = process(event);
const transformedPayload = {
message: proccessedRespList,
metadata: event.metadata,
destination,
};
eventChunksArray.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
}),
);

let batchResponseList = [];
if (eventChunksArray.length > 0) {
batchResponseList = batchEvents(eventChunksArray);
}

return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { process, processRouterDest };
2 changes: 2 additions & 0 deletions src/v0/util/errorTypes/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const TransformerProxyError = require('./transformerProxyError');
const FilteredEventsError = require('./filteredEventsError');

module.exports = {
TransformerProxyError,
FilteredEventsError,
};
Loading

0 comments on commit 3b6e35a

Please sign in to comment.