From 3771b3dc24f9a5d8c5e5e1a781c823e923b810dd Mon Sep 17 00:00:00 2001 From: AASHISH MALIK Date: Mon, 25 Sep 2023 17:58:17 +0530 Subject: [PATCH] feat: batching in cm360 --- .../destinations/campaign_manager/config.js | 3 + .../campaign_manager/transform.js | 122 +++++++++++++++++- test/__tests__/campaign_manager.test.js | 5 +- .../data/campaign_manager_router_input.json | 94 +++++++++++++- .../data/campaign_manager_router_output.json | 27 +++- 5 files changed, 232 insertions(+), 19 deletions(-) diff --git a/src/v0/destinations/campaign_manager/config.js b/src/v0/destinations/campaign_manager/config.js index 063f65cb14..b3a9531347 100644 --- a/src/v0/destinations/campaign_manager/config.js +++ b/src/v0/destinations/campaign_manager/config.js @@ -9,6 +9,8 @@ const ConfigCategories = { }, }; +const MAX_BATCH_CONVERSATIONS_SIZE = 1000; + const EncryptionEntityType = [ 'ENCRYPTION_ENTITY_TYPE_UNKNOWN', 'DCM_ACCOUNT', @@ -28,4 +30,5 @@ module.exports = { BASE_URL, EncryptionEntityType, EncryptionSource, + MAX_BATCH_CONVERSATIONS_SIZE, }; diff --git a/src/v0/destinations/campaign_manager/transform.js b/src/v0/destinations/campaign_manager/transform.js index 723980f0e4..a8983ebccd 100644 --- a/src/v0/destinations/campaign_manager/transform.js +++ b/src/v0/destinations/campaign_manager/transform.js @@ -1,11 +1,15 @@ +const lodash = require('lodash'); const { EventType } = require('../../../constants'); const { constructPayload, defaultRequestConfig, defaultPostRequestConfig, + defaultBatchRequestConfig, removeUndefinedAndNullValues, + getSuccessRespEvents, isDefinedAndNotNull, - simpleProcessRouterDest, + checkInvalidRtTfEvents, + handleRtTfSingleEventError, getAccessToken, } = require('../../util'); @@ -15,6 +19,7 @@ const { BASE_URL, EncryptionEntityType, EncryptionSource, + MAX_BATCH_CONVERSATIONS_SIZE, } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); @@ -75,10 +80,12 @@ function processTrack(message, metadata, destination) { const date = new Date(requestJson.timestampMicros); let unixTimestamp = date.getTime(); // Date, moment both are not able to distinguish input if it is second,millisecond or microsecond unix timestamp - // Using count of digits to distinguish between the 3, 9999999999999 (13 digits) means Nov 20 2286 - if (unixTimestamp.toString().length === 13) { // milliseconds + // Using count of digits to distinguish between the 3, 9999999999999 (13 digits) means Nov 20 2286 + if (unixTimestamp.toString().length === 13) { + // milliseconds unixTimestamp *= 1000; - } else if (unixTimestamp.toString().length === 10) { // seconds + } else if (unixTimestamp.toString().length === 10) { + // seconds unixTimestamp *= 1000000; } requestJson.timestampMicros = unixTimestamp.toString(); @@ -147,7 +154,7 @@ function postValidateRequest(response) { ); } - if ( + if ( !response.body.JSON.conversions[0].gclid && !response.body.JSON.conversions[0].matchId && !response.body.JSON.conversions[0].dclid && @@ -185,9 +192,110 @@ function process(event) { return response; } +const generateBatch = (eventKind, events) => { + const batchRequestObject = defaultBatchRequestConfig(); + const conversions = []; + let encryptionInfo = {}; + const metadata = []; + // extracting destination, message from the first event in a batch + const { destination, message } = events[0]; + // Batch event into dest batch structure + events.forEach((ev) => { + conversions.push(...ev.message.body.JSON.conversions); + metadata.push(ev.metadata); + if (ev.message.body.JSON.encryptionInfo) { + encryptionInfo = ev.message.body.JSON.encryptionInfo; + } + }); + + batchRequestObject.batchedRequest.body.JSON = { + kind: eventKind, + conversions, + }; + + if (Object.keys(encryptionInfo).length > 0) { + batchRequestObject.batchedRequest.body.JSON.encryptionInfo = encryptionInfo; + } + + batchRequestObject.batchedRequest.endpoint = message.endpoint; + + batchRequestObject.batchedRequest.headers = message.headers; + + return { + ...batchRequestObject, + metadata, + destination, + }; +}; + +const batchEvents = (eventChunksArray) => { + const batchedResponseList = []; + + // group batchInsert and batchUpdate payloads + const groupedEventChunks = lodash.groupBy( + eventChunksArray, + (event) => event.message.body.JSON.kind, + ); + Object.keys(groupedEventChunks).forEach((eventKind) => { + // eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..] + const eventChunks = lodash.chunk(groupedEventChunks[eventKind], MAX_BATCH_CONVERSATIONS_SIZE); + eventChunks.forEach((chunk) => { + const batchEventResponse = generateBatch(eventKind, chunk); + batchedResponseList.push( + getSuccessRespEvents( + batchEventResponse.batchedRequest, + batchEventResponse.metadata, + batchEventResponse.destination, + true, + ), + ); + }); + }); + return batchedResponseList; +}; + const processRouterDest = async (inputs, reqMetadata) => { - const respList = await simpleProcessRouterDest(inputs, process, reqMetadata); - return respList; + const errorRespEvents = checkInvalidRtTfEvents(inputs); + if (errorRespEvents.length > 0) { + return errorRespEvents; + } + + const batchErrorRespList = []; + const eventChunksArray = []; + const { destination } = inputs[0]; + await Promise.all( + inputs.map(async (event) => { + try { + if (event.message.statusCode) { + // already transformed event + eventChunksArray.push({ + message: event.message, + metadata: event.metadata, + destination, + }); + } else { + // if not transformed + const proccessedRespList = process(event); + const transformedPayload = { + message: proccessedRespList, + metadata: event.metadata, + destination, + }; + eventChunksArray.push(transformedPayload); + } + } catch (error) { + const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata); + batchErrorRespList.push(errRespEvent); + } + }), + ); + + let batchResponseList = []; + if (eventChunksArray.length > 0) { + batchResponseList = batchEvents(eventChunksArray); + } + + return [...batchResponseList, ...batchErrorRespList]; }; module.exports = { process, processRouterDest }; diff --git a/test/__tests__/campaign_manager.test.js b/test/__tests__/campaign_manager.test.js index 4f3a40d520..15568eddb2 100644 --- a/test/__tests__/campaign_manager.test.js +++ b/test/__tests__/campaign_manager.test.js @@ -28,8 +28,7 @@ describe(`${name} Tests`, () => { testData.forEach((dataPoint, index) => { it(`${index}. ${integration} - ${dataPoint.description}`, async () => { try { - let output = await transformer.process(dataPoint.input); - delete output.body.JSON.idempotency; + const output = await transformer.process(dataPoint.input); expect(output).toEqual(dataPoint.output); } catch (error) { expect(error.message).toEqual(dataPoint.output.error); @@ -37,7 +36,7 @@ describe(`${name} Tests`, () => { }); }); }); - + describe("Router Tests", () => { it("Payload", async () => { const routerOutput = await transformer.processRouterDest(inputRouterData); diff --git a/test/__tests__/data/campaign_manager_router_input.json b/test/__tests__/data/campaign_manager_router_input.json index 18893a0b43..8cae574c55 100644 --- a/test/__tests__/data/campaign_manager_router_input.json +++ b/test/__tests__/data/campaign_manager_router_input.json @@ -61,8 +61,6 @@ "density": 2 } }, - "event": "Promotion Clicked", - "type": "track", "originalTimestamp": "2022-11-17T00:22:02.903+05:30", "properties": { "profileId": 437689, @@ -70,7 +68,7 @@ "ordinal": "string", "quantity": "455678", "floodlightActivityId": "456543345245", - "value": 7, + "value": 47, "encryptedUserIdCandidates": ["dfghjbnm"], "limitAdTracking": true, "childDirectedTreatment": true, @@ -151,8 +149,94 @@ "density": 2 } }, - "event": "Promotion Clicked", + "originalTimestamp": "2023-11-17T00:22:02.903+05:30", + "properties": { + "profileId": 437689, + "floodlightConfigurationId": "213123123", + "ordinal": "345345345", + "quantity": "4556784325345345", + "floodlightActivityId": "456543345245", + "value": 17, + "encryptedUserIdCandidates": ["dfghjbnm"], + "limitAdTracking": true, + "childDirectedTreatment": true, + "encryptionSource": "AD_SERVING", + "encryptionEntityId": "3564523", + "encryptionEntityType": "DCM_ACCOUNT", + "requestType": "batchinsert" + }, "type": "track", + "event": "event test", + "anonymousId": "randomId", + "integrations": { + "All": true + }, + "name": "ApplicationLoaded", + "sentAt": "2023-11-17T00:22:02.903+05:30" + } + }, + { + "metadata": { + "secret": { + "access_token": "dummyApiToken", + "refresh_token": "efgh5678", + "developer_token": "ijkl91011" + }, + "jobId": 2 + }, + "destination": { + "Config": { + "treatmentForUnderage": false, + "limitAdTracking": false, + "childDirectedTreatment": false, + "nonPersonalizedAd": false, + "rudderAccountId": "2EOknn1JNH7WK1MfNku4fGYKkRK" + } + }, + "message": { + "channel": "web", + "context": { + "app": { + "build": "1.0.0", + "name": "RudderLabs JavaScript SDK", + "namespace": "com.rudderlabs.javascript", + "version": "1.0.0" + }, + "device": { + "id": "0572f78fa49c648e", + "name": "generic_x86_arm", + "type": "Android", + "model": "AOSP on IA Emulator", + "manufacturer": "Google", + "adTrackingEnabled": true, + "advertisingId": "44c97318-9040-4361-8bc7-4eb30f665ca8" + }, + "traits": { + "email": "alex@example.com", + "phone": "+1-202-555-0146", + "firstName": "John", + "lastName": "Gomes", + "city": "London", + "state": "England", + "countryCode": "GB", + "postalCode": "EC3M", + "streetAddress": "71 Cherry Court SOUTHAMPTON SO53 5PD UK" + }, + "library": { + "name": "RudderLabs JavaScript SDK", + "version": "1.0.0" + }, + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36", + "locale": "en-US", + "ip": "0.0.0.0", + "os": { + "name": "", + "version": "" + }, + "screen": { + "density": 2 + } + }, "originalTimestamp": "2022-11-17T00:22:02.903+05:30", "properties": { "profileId": 437689, @@ -241,8 +325,6 @@ "density": 2 } }, - "event": "Promotion Clicked", - "type": "track", "originalTimestamp": "2022-11-17T00:22:02.903+05:30", "properties": { "profileId": 437689, diff --git a/test/__tests__/data/campaign_manager_router_output.json b/test/__tests__/data/campaign_manager_router_output.json index 61c3438ab6..f92713e2b3 100644 --- a/test/__tests__/data/campaign_manager_router_output.json +++ b/test/__tests__/data/campaign_manager_router_output.json @@ -28,10 +28,23 @@ "ordinal": "string", "quantity": "455678", "floodlightActivityId": "456543345245", - "value": 7, + "value": 47, "encryptedUserIdCandidates": ["dfghjbnm"], "limitAdTracking": true, "childDirectedTreatment": true + }, + { + "floodlightConfigurationId": "213123123", + "ordinal": "345345345", + "quantity": "4556784325345345", + "floodlightActivityId": "456543345245", + "value": 17, + "encryptedUserIdCandidates": ["dfghjbnm"], + "limitAdTracking": true, + "childDirectedTreatment": true, + "nonPersonalizedAd": false, + "timestampMicros": "1700160722903000", + "treatmentForUnderage": false } ] }, @@ -49,9 +62,17 @@ "refresh_token": "efgh5678" }, "jobId": 1 + }, + { + "jobId": 2, + "secret": { + "access_token": "dummyApiToken", + "developer_token": "ijkl91011", + "refresh_token": "efgh5678" + } } ], - "batched": false, + "batched": true, "statusCode": 200, "destination": { "Config": { @@ -113,7 +134,7 @@ "jobId": 2 } ], - "batched": false, + "batched": true, "statusCode": 200, "destination": { "Config": {