Skip to content

Commit

Permalink
chore: return 298 for filtered/dropped events in user transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
psrikanth88 committed Sep 26, 2023
1 parent e7dbd2c commit 367851c
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 28 deletions.
89 changes: 61 additions & 28 deletions src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import {
UserTransformationResponse,
UserTransformationServiceResponse,
} from '../types/index';
import { RespStatusError, RetryRequestError, extractStackTraceUptoLastSubstringMatch } from '../util/utils';
import {
RespStatusError,
RetryRequestError,
extractStackTraceUptoLastSubstringMatch,
} from '../util/utils';
import { getMetadata, isNonFuncObject } from '../v0/util';
import { SUPPORTED_FUNC_NAMES } from '../util/ivmFactory';
import logger from '../logger';
import stats from '../util/stats';
import { CommonUtils } from '../util/common';

export default class UserTransformService {
public static async transformRoutine(
events: ProcessorTransformationRequest[],
): Promise<UserTransformationServiceResponse> {

const startTime = new Date();
let retryStatus = 200;
const groupedEvents: Object = groupBy(
events,
Expand All @@ -43,6 +46,8 @@ export default class UserTransformService {
const transformationVersionId =
eventsToProcess[0]?.destination?.Transformations[0]?.VersionID;
const messageIds = eventsToProcess.map((ev) => ev.metadata?.messageId);
const messageIdsSet = new Set<string>(messageIds);
const messageIdsInOutputSet = new Set<string>();

const commonMetadata = {
sourceId: eventsToProcess[0]?.metadata?.sourceId,
Expand Down Expand Up @@ -76,31 +81,57 @@ export default class UserTransformService {
transformationVersionId,
librariesVersionIDs,
);
transformedEvents.push(
...destTransformedEvents.map((ev) => {
if (ev.error) {
return {
statusCode: 400,
error: ev.error,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
} as ProcessorTransformationResponse;
}
if (!isNonFuncObject(ev.transformedEvent)) {
return {
statusCode: 400,
error: `returned event in events from user transformation is not an object. transformationVersionId:${transformationVersionId} and returned event: ${JSON.stringify(
ev.transformedEvent,
)}`,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
} as ProcessorTransformationResponse;
}
return {
output: ev.transformedEvent,

const transformedEventsWithMetadata: ProcessorTransformationResponse[] = [];
destTransformedEvents.forEach((ev) => {
if (ev.error) {
transformedEventsWithMetadata.push({
statusCode: 400,
error: ev.error,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
statusCode: 200,
} as ProcessorTransformationResponse;
}),
);
} as ProcessorTransformationResponse);
return;
}
if (!isNonFuncObject(ev.transformedEvent)) {
transformedEventsWithMetadata.push({
statusCode: 400,
error: `returned event in events from user transformation is not an object. transformationVersionId:${transformationVersionId} and returned event: ${JSON.stringify(
ev.transformedEvent,
)}`,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
} as ProcessorTransformationResponse);
return;
}
// add messageId to output set
if (ev.metadata?.messageId) {
messageIdsInOutputSet.add(ev.metadata.messageId);
} else if (ev.metadata?.messageIds) {
ev.metadata.messageIds.forEach((id) => messageIdsInOutputSet.add(id));
}
transformedEventsWithMetadata.push({
output: ev.transformedEvent,
metadata: isEmpty(ev.metadata) ? commonMetadata : ev.metadata,
statusCode: 200,
} as ProcessorTransformationResponse);
});

// TODO: add feature flag based on rudder-server version to do this
// find difference between input and output messageIds
logger.debug(`messageIdsInOutputSet: ${messageIdsInOutputSet.keys()}`);
logger.debug(`messageIdsSet: ${messageIdsSet.keys()}`);
const messageIdsNotInOutput = CommonUtils.setDiff(messageIdsSet, messageIdsInOutputSet);
logger.debug(`messageIdsNotInOutput: ${messageIdsNotInOutput}`);
const droppedEvents = messageIdsNotInOutput.map((id) => ({
statusCode: 298,
metadata: {
...commonMetadata,
messageId: id,
messageIds: null,
},
}));
transformedEvents.push(...droppedEvents);

transformedEvents.push(...transformedEventsWithMetadata);
} catch (error: any) {
logger.error(error);
let status = 400;
Expand Down Expand Up @@ -172,7 +203,9 @@ export default class UserTransformService {
response.status = 200;
} catch (error: any) {
response.status = 400;
response.body = { error: extractStackTraceUptoLastSubstringMatch(error.stack, SUPPORTED_FUNC_NAMES) };
response.body = {
error: extractStackTraceUptoLastSubstringMatch(error.stack, SUPPORTED_FUNC_NAMES),
};
}
return response;
}
Expand Down
4 changes: 4 additions & 0 deletions src/util/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const CommonUtils = {
}
return [obj];
},

setDiff(mainSet, comparisionSet) {
return [...mainSet].filter((item) => !comparisionSet.has(item));
},
};

module.exports = {
Expand Down
185 changes: 185 additions & 0 deletions test/__tests__/data/user_transformation_service_filter_input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
[
{
"message": {
"channel": "web",
"context": {
"app": {
"build": "1.0.0",
"name": "RudderLabs JavaScript SDK",
"namespace": "com.rudderlabs.javascript",
"version": "1.0.0"
},
"traits": {
"anonymousId": "123456"
},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.0.0"
},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36",
"locale": "en-US",
"ip": "0.0.0.0",
"os": {
"name": "",
"version": ""
},
"screen": {
"density": 2
}
},
"type": "identify",
"messageId": "84e26acc-56a5-4835-8233-591137fca468",
"originalTimestamp": "2019-10-14T09:03:17.562Z",
"anonymousId": "123456",
"userId": "123456",
"integrations": {
"All": true
},
"sentAt": "2019-10-14T09:03:22.563Z"
},
"metadata": {
"messageId": "84e26acc-56a5-4835-8233-591137fca468",
"sourceId": "s1",
"destinationId": "d1"
},
"destination": {
"ID": 2,
"Config": {
"trackingID": "abcd"
},
"Enabled": true,
"Transformations": [
{
"VersionID": "24"
}
]
}
},
{
"message": {
"channel": "web",
"context": {
"app": {
"build": "1.0.0",
"name": "RudderLabs JavaScript SDK",
"namespace": "com.rudderlabs.javascript",
"version": "1.0.0"
},
"traits": {
"email": "[email protected]",
"anonymousId": "12345"
},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.0.0"
},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36",
"locale": "en-US",
"ip": "0.0.0.0",
"os": {
"name": "",
"version": ""
},
"screen": {
"density": 2
}
},
"type": "page",
"messageId": "5e10d13a-bf9a-44bf-b884-43a9e591ea71",
"originalTimestamp": "2019-10-14T11:15:18.299Z",
"anonymousId": "00000000000000000000000000",
"userId": "12345",
"properties": {
"path": "/abc",
"referrer": "",
"search": "",
"title": "",
"url": ""
},
"integrations": {
"All": true
},
"name": "ApplicationLoaded",
"sentAt": "2019-10-14T11:15:53.296Z"
},
"metadata": {
"messageId": "5e10d13a-bf9a-44bf-b884-43a9e591ea71",
"sourceId": "s1",
"destinationId": "d1"
},
"destination": {
"ID": 2,
"Config": {
"trackingID": "abcd"
},
"Enabled": true,
"Transformations": [
{
"VersionID": "24"
}
]
}
},
{
"message": {
"channel": "web",
"context": {
"app": {
"build": "1.0.0",
"name": "RudderLabs JavaScript SDK",
"namespace": "com.rudderlabs.javascript",
"version": "1.0.0"
},
"traits": {
"email": "[email protected]",
"anonymousId": "12345"
},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.0.0"
},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36",
"locale": "en-US",
"ip": "0.0.0.0",
"os": {
"name": "",
"version": ""
},
"screen": {
"density": 2
}
},
"type": "track",
"messageId": "ec5481b6-a926-4d2e-b293-0b3a77c4d3be",
"originalTimestamp": "2019-10-14T11:15:18.300Z",
"anonymousId": "00000000000000000000000000",
"userId": "12345",
"event": "test track event GA3",
"properties": {
"user_actual_role": "system_admin, system_user",
"user_actual_id": 12345
},
"integrations": {
"All": true
},
"sentAt": "2019-10-14T11:15:53.296Z"
},
"metadata": {
"messageId": "ec5481b6-a926-4d2e-b293-0b3a77c4d3be",
"sourceId": "s1",
"destinationId": "d1"
},
"destination": {
"ID": 2,
"Config": {
"trackingID": "abcd"
},
"Enabled": true,
"Transformations": [
{
"VersionID": "24"
}
]
}
}
]
74 changes: 74 additions & 0 deletions test/__tests__/data/user_transformation_service_filter_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
{
"transformedEvents": [
{
"statusCode": 298,
"metadata": {
"sourceId": "s1",
"destinationId": "d1",
"messageIds": null,
"messageId": "84e26acc-56a5-4835-8233-591137fca468"
}
},
{
"statusCode": 298,
"metadata": {
"sourceId": "s1",
"destinationId": "d1",
"messageIds": null,
"messageId": "5e10d13a-bf9a-44bf-b884-43a9e591ea71"
}
},
{
"output": {
"channel": "web",
"context": {
"app": {
"build": "1.0.0",
"name": "RudderLabs JavaScript SDK",
"namespace": "com.rudderlabs.javascript",
"version": "1.0.0"
},
"traits": {
"email": "[email protected]",
"anonymousId": "12345"
},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.0.0"
},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36",
"locale": "en-US",
"ip": "0.0.0.0",
"os": {
"name": "",
"version": ""
},
"screen": {
"density": 2
}
},
"type": "track",
"messageId": "ec5481b6-a926-4d2e-b293-0b3a77c4d3be",
"originalTimestamp": "2019-10-14T11:15:18.300Z",
"anonymousId": "00000000000000000000000000",
"userId": "12345",
"event": "test track event GA3",
"properties": {
"user_actual_role": "system_admin, system_user",
"user_actual_id": 12345
},
"integrations": {
"All": true
},
"sentAt": "2019-10-14T11:15:53.296Z"
},
"metadata": {
"messageId": "ec5481b6-a926-4d2e-b293-0b3a77c4d3be",
"sourceId": "s1",
"destinationId": "d1"
},
"statusCode": 200
}
],
"retryStatus": 200
}
Loading

0 comments on commit 367851c

Please sign in to comment.