Skip to content

Commit

Permalink
Merge branch 'develop' into feat.consentMode.GAOC
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 authored Mar 18, 2024
2 parents e05e730 + 5e4ddbd commit b2f4830
Show file tree
Hide file tree
Showing 53 changed files with 7,620 additions and 6,995 deletions.
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"rudder-transformer-cdk": "^1.4.11",
"set-value": "^4.1.0",
"sha256": "^0.2.0",
"sqlstring": "^2.3.3",
"stacktrace-parser": "^0.1.10",
"statsd-client": "^0.4.7",
"truncate-utf8-bytes": "^1.0.2",
Expand Down
3 changes: 3 additions & 0 deletions src/cdk/v2/destinations/movable_ink/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports = {
MAX_REQUEST_SIZE_IN_BYTES: 13500,
};
72 changes: 72 additions & 0 deletions src/cdk/v2/destinations/movable_ink/procWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
bindings:
- name: EventType
path: ../../../../constants
- path: ../../bindings/jsontemplate
- name: defaultRequestConfig
path: ../../../../v0/util
- name: toUnixTimestampInMS
path: ../../../../v0/util
- name: base64Convertor
path: ../../../../v0/util
- path: ./utils

steps:
- name: messageType
template: |
.message.type.toLowerCase();
- name: validateInput
template: |
let messageType = $.outputs.messageType;
$.assert(messageType, "message Type is not present. Aborting");
$.assert(messageType in {{$.EventType.([.IDENTIFY,.TRACK])}}, "message type " + messageType + " is not supported");
$.assertConfig(.destination.Config.endpoint, "Movable Ink Endpoint is not present. Aborting");
$.assertConfig(.destination.Config.accessKey, "Access key is not present . Aborting");
$.assertConfig(.destination.Config.accessSecret, "Access Secret is not present. Aborting");
$.assert(.message.timestamp ?? .message.originalTimestamp, "Timestamp is not present. Aborting");
const userId = .message.().(
{{{{$.getGenericPaths("userIdOnly")}}}};
);
const email = .message.().(
{{{{$.getGenericPaths("email")}}}};
);
$.assert(userId ?? email ?? .message.anonymousId, "Either one of userId or email or anonymousId is required. Aborting");
- name: preparePayload
description: Prepare payload for identify and track. This payload schema needs to be configured in the Movable Ink dashboard. Movable Ink will discard any additional fields from the input payload.
template: |
const userId = .message.().(
{{{{$.getGenericPaths("userIdOnly")}}}};
);
const email = .message.().(
{{{{$.getGenericPaths("email")}}}};
);
const timestampInUnix = $.toUnixTimestampInMS(.message.().(
{{{{$.getGenericPaths("timestamp")}}}};
));
$.context.payload = {
...(.message),
userId: userId ?? email,
timestamp: timestampInUnix,
anonymousId: .message.anonymousId
}
- name: buildResponse
description: In batchMode we return payload directly
condition: $.batchMode
template: |
$.context.payload
else:
name: buildResponseForProcessTransformation
template: |
const response = $.defaultRequestConfig();
response.body.JSON = $.context.payload;
response.endpoint = .destination.Config.endpoint;
response.method = "POST";
response.headers = {
"Content-Type": "application/json",
"Authorization": "Basic " + $.base64Convertor(.destination.Config.accessKey + ":" + .destination.Config.accessSecret)
}
response;
74 changes: 74 additions & 0 deletions src/cdk/v2/destinations/movable_ink/rtWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
bindings:
- name: handleRtTfSingleEventError
path: ../../../../v0/util/index
- path: ./utils
exportAll: true
- name: base64Convertor
path: ../../../../v0/util
- name: BatchUtils
path: '@rudderstack/workflow-engine'
- path: ./config

steps:
- name: validateInput
template: |
$.assert(Array.isArray(^) && ^.length > 0, "Invalid event array")
- name: transform
externalWorkflow:
path: ./procWorkflow.yaml
bindings:
- name: batchMode
value: true
loopOverInput: true

- name: successfulEvents
template: |
$.outputs.transform#idx.output.({
"batchedRequest": .,
"batched": false,
"destination": ^[idx].destination,
"metadata": ^[idx].metadata,
"statusCode": 200
})[]
- name: failedEvents
template: |
$.outputs.transform#idx.error.(
$.handleRtTfSingleEventError(^[idx], .originalError ?? ., {})
)[]
- name: batchSuccessfulEvents
description: Batches the successfulEvents
template: |
let batches = $.BatchUtils.chunkArrayBySizeAndLength(
$.outputs.successfulEvents, {maxSizeInBytes: $.MAX_REQUEST_SIZE_IN_BYTES}).items;
batches@batch.({
"batchedRequest": {
"body": {
"JSON": {"events": ~r batch.batchedRequest[]},
"JSON_ARRAY": {},
"XML": {},
"FORM": {}
},
"version": "1",
"type": "REST",
"method": "POST",
"endpoint": batch[0].destination.Config.().(.endpoint),
"headers": batch[0].destination.Config.().({
"Content-Type": "application/json",
"Authorization": "Basic " + $.base64Convertor(.accessKey + ":" + .accessSecret)
}),
"params": {},
"files": {}
},
"metadata": ~r batch.metadata[],
"batched": true,
"statusCode": 200,
"destination": batch[0].destination
})[];
- name: finalPayload
template: |
[...$.outputs.batchSuccessfulEvents, ...$.outputs.failedEvents]
3 changes: 2 additions & 1 deletion src/controllers/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ export class UserTransformController {
'(User transform - router:/customTransform ):: Request to transformer',
JSON.stringify(ctx.request.body),
);
const requestSize = Number(ctx.request.get('content-length'));
const events = ctx.request.body as ProcessorTransformationRequest[];
const processedRespone: UserTransformationServiceResponse =
await UserTransformService.transformRoutine(events, ctx.state.features);
await UserTransformService.transformRoutine(events, ctx.state.features, requestSize);
ctx.body = processedRespone.transformedEvents;
ControllerUtility.postProcess(ctx, processedRespone.retryStatus);
logger.debug(
Expand Down
1 change: 1 addition & 0 deletions src/features.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"THE_TRADE_DESK": true,
"INTERCOM": true,
"NINETAILED": true,
"MOVABLE_INK": true,
"KOALA": true
},
"regulations": [
Expand Down
14 changes: 9 additions & 5 deletions src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
RetryRequestError,
extractStackTraceUptoLastSubstringMatch,
} from '../util/utils';
import { getMetadata, isNonFuncObject } from '../v0/util';
import { getMetadata, getTransformationMetadata, isNonFuncObject } from '../v0/util';
import { SUPPORTED_FUNC_NAMES } from '../util/ivmFactory';
import logger from '../logger';
import stats from '../util/stats';
Expand All @@ -28,6 +28,7 @@ export class UserTransformService {
public static async transformRoutine(
events: ProcessorTransformationRequest[],
features: FeatureFlags = {},
requestSize = 0,
): Promise<UserTransformationServiceResponse> {
let retryStatus = 200;
const groupedEvents: NonNullable<unknown> = groupBy(
Expand Down Expand Up @@ -162,16 +163,19 @@ export class UserTransformService {
),
);
stats.counter('user_transform_errors', eventsToProcess.length, {
transformationId: eventsToProcess[0]?.metadata?.transformationId,
workspaceId: eventsToProcess[0]?.metadata?.workspaceId,
status,
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});
} finally {
stats.timing('user_transform_request_latency', userFuncStartTime, {
workspaceId: eventsToProcess[0]?.metadata?.workspaceId,
transformationId: eventsToProcess[0]?.metadata?.transformationId,
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.histogram('user_transform_batch_size', requestSize, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});
}

Expand Down
20 changes: 20 additions & 0 deletions src/util/prometheus.js
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,10 @@ class Prometheus {
name: 'tp_batch_size',
help: 'Size of batch of events for tracking plan validation',
type: 'histogram',
buckets: [
1024, 102400, 524288, 1048576, 10485760, 20971520, 52428800, 104857600, 209715200,
524288000,
],
labelNames: [
'sourceType',
'destinationType',
Expand Down Expand Up @@ -670,6 +674,22 @@ class Prometheus {
'k8_namespace',
],
},
{
name: 'user_transform_batch_size',
help: 'user_transform_batch_size',
type: 'histogram',
labelNames: [
'workspaceId',
'transformationId',
'sourceType',
'destinationType',
'k8_namespace',
],
buckets: [
1024, 102400, 524288, 1048576, 10485760, 20971520, 52428800, 104857600, 209715200,
524288000,
], // 1KB, 100KB, 0.5MB, 1MB, 10MB, 20MB, 50MB, 100MB, 200MB, 500MB
},
{
name: 'source_transform_request_latency',
help: 'source_transform_request_latency',
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/am/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ const responseBuilderSimple = (
...campaign,
};

// we are updating the payload with skip_user_properties_sync
AMUtils.updateWithSkipAttribute(message, rawPayload);

const respData = getResponseData(evType, destination, rawPayload, message, groupInfo);
const { groups, rawPayload: updatedRawPayload } = respData;

Expand Down
35 changes: 34 additions & 1 deletion src/v0/destinations/am/util.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
const { getUnsetObj, validateEventType, userPropertiesPostProcess } = require('./utils');
const {
getUnsetObj,
validateEventType,
userPropertiesPostProcess,
updateWithSkipAttribute,
} = require('./utils');

describe('getUnsetObj', () => {
it("should return undefined when 'message.integrations.Amplitude.fieldsToUnset' is not array", () => {
Expand Down Expand Up @@ -164,3 +169,31 @@ describe('userPropertiesPostProcess', () => {
});
});
});

describe('updateWithSkipAttribute', () => {
// when 'skipUserPropertiesSync ' is present in 'integrations.Amplitude', return the original payload.
it("should return the original payload when 'skipUserPropertiesSync' is present", () => {
const message = { integrations: { Amplitude: { skipUserPropertiesSync: true } } };
const payload = { key: 'value' };
const expectedPayload = { key: 'value', $skip_user_properties_sync: true };
updateWithSkipAttribute(message, payload);
expect(expectedPayload).toEqual(payload);
});

// When 'skipUserPropertiesSync' is not present in 'integrations.Amplitude', return the original payload.
it("should return the original payload when 'skipUserPropertiesSync' is not present", () => {
const message = { integrations: { Amplitude: {} } };
const payload = { key: 'value' };
const expectedPayload = { key: 'value' };
updateWithSkipAttribute(message, payload);
expect(payload).toEqual(expectedPayload);
});
// When 'message' is null, return null.
it("should return null when 'message' is null", () => {
const message = null;
const payload = { key: 'value' };
const expectedPayload = { key: 'value' };
updateWithSkipAttribute(message, payload);
expect(payload).toEqual(expectedPayload);
});
});
9 changes: 9 additions & 0 deletions src/v0/destinations/am/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
const get = require('get-value');
const uaParser = require('@amplitude/ua-parser-js');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const set = require('set-value');
const logger = require('../../../logger');
const { isDefinedAndNotNull } = require('../../util');

Expand Down Expand Up @@ -110,6 +111,13 @@ const getUnsetObj = (message) => {
return unsetObject;
};

const updateWithSkipAttribute = (message, payload) => {
const skipAttribute = get(message, 'integrations.Amplitude.skipUserPropertiesSync');
if (skipAttribute) {
set(payload, '$skip_user_properties_sync', true);
}
};

/**
* Check for evType as in some cases, like when the page name is absent,
* either the template depends only on the event.name or there is no template provided by user
Expand Down Expand Up @@ -187,4 +195,5 @@ module.exports = {
getUnsetObj,
validateEventType,
userPropertiesPostProcess,
updateWithSkipAttribute,
};
Loading

0 comments on commit b2f4830

Please sign in to comment.