Skip to content

Commit

Permalink
feat: batching in cm360
Browse files Browse the repository at this point in the history
  • Loading branch information
aashishmalik committed Sep 25, 2023
1 parent 0218292 commit 19f9829
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 19 deletions.
3 changes: 3 additions & 0 deletions src/v0/destinations/campaign_manager/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const ConfigCategories = {
},
};

const MAX_BATCH_CONVERSATIONS_SIZE = 1000;

const EncryptionEntityType = [
'ENCRYPTION_ENTITY_TYPE_UNKNOWN',
'DCM_ACCOUNT',
Expand All @@ -28,4 +30,5 @@ module.exports = {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
};
122 changes: 115 additions & 7 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
const lodash = require('lodash');
const { EventType } = require('../../../constants');
const {
constructPayload,
defaultRequestConfig,
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
simpleProcessRouterDest,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');

Expand All @@ -15,6 +19,7 @@ const {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
} = require('./config');

const { InstrumentationError } = require('../../util/errorTypes');
Expand Down Expand Up @@ -75,10 +80,12 @@ function processTrack(message, metadata, destination) {
const date = new Date(requestJson.timestampMicros);
let unixTimestamp = date.getTime();
// Date, moment both are not able to distinguish input if it is second,millisecond or microsecond unix timestamp
// Using count of digits to distinguish between the 3, 9999999999999 (13 digits) means Nov 20 2286
if (unixTimestamp.toString().length === 13) { // milliseconds
// Using count of digits to distinguish between the 3, 9999999999999 (13 digits) means Nov 20 2286
if (unixTimestamp.toString().length === 13) {
// milliseconds
unixTimestamp *= 1000;
} else if (unixTimestamp.toString().length === 10) { // seconds
} else if (unixTimestamp.toString().length === 10) {
// seconds
unixTimestamp *= 1000000;

Check warning on line 89 in src/v0/destinations/campaign_manager/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/transform.js#L89

Added line #L89 was not covered by tests
}
requestJson.timestampMicros = unixTimestamp.toString();
Expand Down Expand Up @@ -147,7 +154,7 @@ function postValidateRequest(response) {
);
}

if (
if (
!response.body.JSON.conversions[0].gclid &&
!response.body.JSON.conversions[0].matchId &&
!response.body.JSON.conversions[0].dclid &&
Expand Down Expand Up @@ -185,9 +192,110 @@ function process(event) {
return response;
}

const generateBatch = (eventKind, events) => {
const batchRequestObject = defaultBatchRequestConfig();
const conversions = [];
let encryptionInfo = {};
const metadata = [];
// extracting destination, message from the first event in a batch
const { destination, message } = events[0];
// Batch event into dest batch structure
events.forEach((ev) => {
conversions.push(...ev.message.body.JSON.conversions);
metadata.push(ev.metadata);
if (ev.message.body.JSON.encryptionInfo) {
encryptionInfo = ev.message.body.JSON.encryptionInfo;
}
});

batchRequestObject.batchedRequest.body.JSON = {
kind: eventKind,
conversions,
};

if (Object.keys(encryptionInfo).length > 0) {
batchRequestObject.batchedRequest.body.JSON.encryptionInfo = encryptionInfo;
}

batchRequestObject.batchedRequest.endpoint = message.endpoint;

batchRequestObject.batchedRequest.headers = message.headers;

return {
...batchRequestObject,
metadata,
destination,
};
};

const batchEvents = (eventChunksArray) => {
const batchedResponseList = [];

// group batchInsert and batchUpdate payloads
const groupedEventChunks = lodash.groupBy(
eventChunksArray,
(event) => event.message.body.JSON.kind,
);
Object.keys(groupedEventChunks).forEach((eventKind) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(groupedEventChunks[eventKind], MAX_BATCH_CONVERSATIONS_SIZE);
eventChunks.forEach((chunk) => {
const batchEventResponse = generateBatch(eventKind, chunk);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
});
return batchedResponseList;
};

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;

Check warning on line 260 in src/v0/destinations/campaign_manager/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/transform.js#L260

Added line #L260 was not covered by tests
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
await Promise.all(
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventChunksArray.push({

Check warning on line 271 in src/v0/destinations/campaign_manager/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/transform.js#L271

Added line #L271 was not covered by tests
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const proccessedRespList = process(event);
const transformedPayload = {
message: proccessedRespList,
metadata: event.metadata,
destination,
};
eventChunksArray.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
}),
);

let batchResponseList = [];
if (eventChunksArray.length > 0) {
batchResponseList = batchEvents(eventChunksArray);
}

return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { process, processRouterDest };
5 changes: 2 additions & 3 deletions test/__tests__/campaign_manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ describe(`${name} Tests`, () => {
testData.forEach((dataPoint, index) => {
it(`${index}. ${integration} - ${dataPoint.description}`, async () => {
try {
let output = await transformer.process(dataPoint.input);
delete output.body.JSON.idempotency;
const output = await transformer.process(dataPoint.input);
expect(output).toEqual(dataPoint.output);
} catch (error) {
expect(error.message).toEqual(dataPoint.output.error);
}
});
});
});

describe("Router Tests", () => {
it("Payload", async () => {
const routerOutput = await transformer.processRouterDest(inputRouterData);
Expand Down
94 changes: 88 additions & 6 deletions test/__tests__/data/campaign_manager_router_input.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,14 @@
"density": 2
}
},
"event": "Promotion Clicked",
"type": "track",
"originalTimestamp": "2022-11-17T00:22:02.903+05:30",
"properties": {
"profileId": 437689,
"floodlightConfigurationId": "213123123",
"ordinal": "string",
"quantity": "455678",
"floodlightActivityId": "456543345245",
"value": 7,
"value": 47,
"encryptedUserIdCandidates": ["dfghjbnm"],
"limitAdTracking": true,
"childDirectedTreatment": true,
Expand Down Expand Up @@ -151,8 +149,94 @@
"density": 2
}
},
"event": "Promotion Clicked",
"originalTimestamp": "2023-11-17T00:22:02.903+05:30",
"properties": {
"profileId": 437689,
"floodlightConfigurationId": "213123123",
"ordinal": "345345345",
"quantity": "4556784325345345",
"floodlightActivityId": "456543345245",
"value": 17,
"encryptedUserIdCandidates": ["dfghjbnm"],
"limitAdTracking": true,
"childDirectedTreatment": true,
"encryptionSource": "AD_SERVING",
"encryptionEntityId": "3564523",
"encryptionEntityType": "DCM_ACCOUNT",
"requestType": "batchinsert"
},
"type": "track",
"event": "event test",
"anonymousId": "randomId",
"integrations": {
"All": true
},
"name": "ApplicationLoaded",
"sentAt": "2023-11-17T00:22:02.903+05:30"
}
},
{
"metadata": {
"secret": {
"access_token": "dummyApiToken",
"refresh_token": "efgh5678",
"developer_token": "ijkl91011"
},
"jobId": 2
},
"destination": {
"Config": {
"treatmentForUnderage": false,
"limitAdTracking": false,
"childDirectedTreatment": false,
"nonPersonalizedAd": false,
"rudderAccountId": "2EOknn1JNH7WK1MfNku4fGYKkRK"
}
},
"message": {
"channel": "web",
"context": {
"app": {
"build": "1.0.0",
"name": "RudderLabs JavaScript SDK",
"namespace": "com.rudderlabs.javascript",
"version": "1.0.0"
},
"device": {
"id": "0572f78fa49c648e",
"name": "generic_x86_arm",
"type": "Android",
"model": "AOSP on IA Emulator",
"manufacturer": "Google",
"adTrackingEnabled": true,
"advertisingId": "44c97318-9040-4361-8bc7-4eb30f665ca8"
},
"traits": {
"email": "[email protected]",
"phone": "+1-202-555-0146",
"firstName": "John",
"lastName": "Gomes",
"city": "London",
"state": "England",
"countryCode": "GB",
"postalCode": "EC3M",
"streetAddress": "71 Cherry Court SOUTHAMPTON SO53 5PD UK"
},
"library": {
"name": "RudderLabs JavaScript SDK",
"version": "1.0.0"
},
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36",
"locale": "en-US",
"ip": "0.0.0.0",
"os": {
"name": "",
"version": ""
},
"screen": {
"density": 2
}
},
"originalTimestamp": "2022-11-17T00:22:02.903+05:30",
"properties": {
"profileId": 437689,
Expand Down Expand Up @@ -241,8 +325,6 @@
"density": 2
}
},
"event": "Promotion Clicked",
"type": "track",
"originalTimestamp": "2022-11-17T00:22:02.903+05:30",
"properties": {
"profileId": 437689,
Expand Down
27 changes: 24 additions & 3 deletions test/__tests__/data/campaign_manager_router_output.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,23 @@
"ordinal": "string",
"quantity": "455678",
"floodlightActivityId": "456543345245",
"value": 7,
"value": 47,
"encryptedUserIdCandidates": ["dfghjbnm"],
"limitAdTracking": true,
"childDirectedTreatment": true
},
{
"floodlightConfigurationId": "213123123",
"ordinal": "345345345",
"quantity": "4556784325345345",
"floodlightActivityId": "456543345245",
"value": 17,
"encryptedUserIdCandidates": ["dfghjbnm"],
"limitAdTracking": true,
"childDirectedTreatment": true,
"nonPersonalizedAd": false,
"timestampMicros": "1700160722903000",
"treatmentForUnderage": false
}
]
},
Expand All @@ -49,9 +62,17 @@
"refresh_token": "efgh5678"
},
"jobId": 1
},
{
"jobId": 2,
"secret": {
"access_token": "dummyApiToken",
"developer_token": "ijkl91011",
"refresh_token": "efgh5678"
}
}
],
"batched": false,
"batched": true,
"statusCode": 200,
"destination": {
"Config": {
Expand Down Expand Up @@ -113,7 +134,7 @@
"jobId": 2
}
],
"batched": false,
"batched": true,
"statusCode": 200,
"destination": {
"Config": {
Expand Down

0 comments on commit 19f9829

Please sign in to comment.