Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cm360 batching support #2651

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v0/destinations/campaign_manager/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const ConfigCategories = {
},
};

const MAX_BATCH_CONVERSATIONS_SIZE = 1000;

const EncryptionEntityType = [
'ENCRYPTION_ENTITY_TYPE_UNKNOWN',
'DCM_ACCOUNT',
Expand All @@ -28,4 +30,5 @@ module.exports = {
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
};
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
"destKey": "timestampMicros",
"sourceKeys": "timestamp",
"sourceFromGenericMap": true,
"required": true,
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
"metadata": {
"type": "microSecondTimestamp"
}
"required": true
},
{
"destKey": "floodlightActivityId",
Expand Down
34 changes: 28 additions & 6 deletions src/v0/destinations/campaign_manager/networkHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,36 @@
const tags = require('../../util/tags');

function checkIfFailuresAreRetryable(response) {
const { status } = response;
let isRetryable = true;

Check warning on line 13 in src/v0/destinations/campaign_manager/networkHandler.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/networkHandler.js#L12-L13

Added lines #L12 - L13 were not covered by tests
try {
if (Array.isArray(response.status) && Array.isArray(response.status[0].errors)) {
return (
response.status[0].errors[0].code !== 'PERMISSION_DENIED' &&
response.status[0].errors[0].code !== 'INVALID_ARGUMENT'
);
if (Array.isArray(status)) {
// iterate over each status, and if found retryable in conversations ..retry else discard
/* status : [{
"conversion": {
object (Conversion)
},
"errors": [
{
object (ConversionError)
}
],
"kind": string
}] */
for (const st of status) {
st.errors.forEach(err => {

Check warning on line 29 in src/v0/destinations/campaign_manager/networkHandler.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/networkHandler.js#L28-L29

Added lines #L28 - L29 were not covered by tests
// if code is any of these, event is not retryable
if (
err.code === 'PERMISSION_DENIED' ||
err.code === 'INVALID_ARGUMENT' ||
err.code === 'NOT_FOUND'

Check warning on line 34 in src/v0/destinations/campaign_manager/networkHandler.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/networkHandler.js#L32-L34

Added lines #L32 - L34 were not covered by tests
) {
isRetryable = false;

Check warning on line 36 in src/v0/destinations/campaign_manager/networkHandler.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/networkHandler.js#L36

Added line #L36 was not covered by tests
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
}
})
}
}
return true;
return isRetryable;

Check warning on line 41 in src/v0/destinations/campaign_manager/networkHandler.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/networkHandler.js#L41

Added line #L41 was not covered by tests
} catch (e) {
return true;
}
Expand Down
174 changes: 141 additions & 33 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
const lodash = require('lodash');
const { EventType } = require('../../../constants');

const {
constructPayload,
defaultRequestConfig,
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
simpleProcessRouterDest,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');

Expand All @@ -16,6 +19,7 @@
BASE_URL,
EncryptionEntityType,
EncryptionSource,
MAX_BATCH_CONVERSATIONS_SIZE,
} = require('./config');

const { InstrumentationError } = require('../../util/errorTypes');
Expand Down Expand Up @@ -72,7 +76,28 @@
delete requestJson.childDirectedTreatment;
delete requestJson.limitAdTracking;
}
requestJson.timestampMicros = requestJson.timestampMicros.toString();

// for handling when input is timestamp as string
const numTimestamp = /^\d+$/.test(requestJson.timestampMicros);
if (numTimestamp) {
// is digit only, below convert string timestamp to numeric
requestJson.timestampMicros *= 1;
}

// 2022-10-11T05:453:90.ZZ
// 16483423423423423
const date = new Date(requestJson.timestampMicros);
let unixTimestamp = date.getTime();
Copy link
Collaborator

@krishna2020 krishna2020 Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this always return in milliseconds only irrespective of input passed ?

Copy link
Contributor Author

@aashishmalik aashishmalik Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If input is 1668624722555000 (this is in microseconds, year 2022) , the date utility still considers it is in milliseconds and convert this to 054846-08-12T03:22:35.000Z (year 5000) , when we call getTime it return 1668624722555000, so doing no of digits check here.

Same case with 1668624722 it considers it in ms, and getTime return it in 1668624722, so need to multiply by 6

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think one alternative thing we can do if they pass timestamp in properties, we can make it mandatory to pass it in microSeconds and get rid of this conversion at our end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function convertToMicroseconds(input) {
  const timestamp = Date.parse(input);

  if (!isNaN(timestamp)) {
    // If the input is a valid date string, timestamp will be a number
    if (input.includes("Z")) {
      // ISO 8601 date string with milliseconds
      return timestamp * 1000;
    } else {
      // ISO 8601 date string without milliseconds
      return timestamp * 1000000;
    }
  } else if (/^\d+$/.test(input)) {
    // If the input is a numeric string (assume microseconds or milliseconds)
    if (input.length <= 13) {
      // Length less than or equal to 13 indicates milliseconds
      return parseInt(input) * 1000;
    } else {
      // Otherwise, assume microseconds
      return parseInt(input);
    }
  } else {
    // Invalid input
    throw new Error("Invalid input format");
  }
}

// Example usage:
const input1 = "2023-10-11T12:34:56.789Z"; // ISO 8601 date string with milliseconds
const input2 = "1633956896789"; // Milliseconds
const input3 = "1633956896789000"; // Microseconds

console.log(convertToMicroseconds(input1)); // Output: 1633956896789000
console.log(convertToMicroseconds(input2)); // Output: 1633956896789000
console.log(convertToMicroseconds(input3)); // Output: 1633956896789000

This code defines the convertToMicroseconds function, which first checks if the input is a valid date string (ISO 8601 format). If it is, it checks whether the input has milliseconds or not and converts it accordingly. If the input is a numeric string, it checks its length to determine whether it's in milliseconds or microseconds. If the input doesn't match any of these formats, it throws an error for an invalid input format.
Source: chatgpt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

console.log(convertToMicroseconds("2022-11-17T00:22:02.903+05:30"));
o/p -->>1668624722903000000 (in nanosweconds)
will modify this a bit

aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
// Date, moment both are not able to distinguish input if it is second,millisecond or microsecond unix timestamp
// Using count of digits to distinguish between these 3, 9999999999999 (13 digits) means Nov 20 2286 which is long far in future
if (unixTimestamp.toString().length === 13) {
// milliseconds
unixTimestamp *= 1000;
} else if (unixTimestamp.toString().length === 10) {
// seconds
unixTimestamp *= 1000000;

Check warning on line 98 in src/v0/destinations/campaign_manager/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/transform.js#L98

Added line #L98 was not covered by tests
}
requestJson.timestampMicros = unixTimestamp.toString();

const encryptionInfo = {};
// prepare encrptionInfo if encryptedUserId or encryptedUserIdCandidates is given
Expand Down Expand Up @@ -138,35 +163,17 @@
);
}

let count = 0;

if (response.body.JSON.conversions[0].gclid) {
count += 1;
}

if (response.body.JSON.conversions[0].dclid) {
count += 1;
}

if (response.body.JSON.conversions[0].encryptedUserId) {
count += 1;
}

if (response.body.JSON.conversions[0].encryptedUserIdCandidates) {
count += 1;
}

if (response.body.JSON.conversions[0].mobileDeviceId) {
count += 1;
}

if (response.body.JSON.conversions[0].impressionId) {
count += 1;
}

if (count !== 1) {
if (
!response.body.JSON.conversions[0].gclid &&
!response.body.JSON.conversions[0].matchId &&
!response.body.JSON.conversions[0].dclid &&
!response.body.JSON.conversions[0].encryptedUserId &&
!response.body.JSON.conversions[0].encryptedUserIdCandidates &&
!response.body.JSON.conversions[0].mobileDeviceId &&
!response.body.JSON.conversions[0].impressionId
) {
aashishmalik marked this conversation as resolved.
Show resolved Hide resolved
throw new InstrumentationError(
'[CAMPAIGN MANAGER (DCM)]: For CM360 we need one of encryptedUserId,encryptedUserIdCandidates, matchId, mobileDeviceId, gclid, dclid, impressionId.',
'[CAMPAIGN MANAGER (DCM)]: Atleast one of encryptedUserId,encryptedUserIdCandidates, matchId, mobileDeviceId, gclid, dclid, impressionId.',
);
}
}
Expand Down Expand Up @@ -194,9 +201,110 @@
return response;
}

const generateBatch = (eventKind, events) => {
const batchRequestObject = defaultBatchRequestConfig();
const conversions = [];
let encryptionInfo = {};
const metadata = [];
// extracting destination, message from the first event in a batch
const { destination, message } = events[0];
// Batch event into dest batch structure
events.forEach((ev) => {
conversions.push(...ev.message.body.JSON.conversions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could keep this ev.message.body.JSON in a variable and reuse it instead of typing long strings

metadata.push(ev.metadata);
if (ev.message.body.JSON.encryptionInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will override the encryptionInfo, is this expected?

encryptionInfo = ev.message.body.JSON.encryptionInfo;
}
});

batchRequestObject.batchedRequest.body.JSON = {
kind: eventKind,
conversions,
};

if (Object.keys(encryptionInfo).length > 0) {
batchRequestObject.batchedRequest.body.JSON.encryptionInfo = encryptionInfo;
}

batchRequestObject.batchedRequest.endpoint = message.endpoint;

batchRequestObject.batchedRequest.headers = message.headers;

return {
...batchRequestObject,
metadata,
destination,
};
};

const batchEvents = (eventChunksArray) => {
const batchedResponseList = [];

// group batchInsert and batchUpdate payloads
const groupedEventChunks = lodash.groupBy(
eventChunksArray,
(event) => event.message.body.JSON.kind,
);
Object.keys(groupedEventChunks).forEach((eventKind) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = lodash.chunk(groupedEventChunks[eventKind], MAX_BATCH_CONVERSATIONS_SIZE);
eventChunks.forEach((chunk) => {
const batchEventResponse = generateBatch(eventKind, chunk);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
});
return batchedResponseList;
};

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;

Check warning on line 269 in src/v0/destinations/campaign_manager/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/transform.js#L269

Added line #L269 was not covered by tests
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
await Promise.all(
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventChunksArray.push({

Check warning on line 280 in src/v0/destinations/campaign_manager/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/campaign_manager/transform.js#L280

Added line #L280 was not covered by tests
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const proccessedRespList = process(event);
const transformedPayload = {
message: proccessedRespList,
metadata: event.metadata,
destination,
};
eventChunksArray.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
}),
);

let batchResponseList = [];
if (eventChunksArray.length > 0) {
batchResponseList = batchEvents(eventChunksArray);
}

return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { process, processRouterDest };
5 changes: 2 additions & 3 deletions test/__tests__/campaign_manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ describe(`${name} Tests`, () => {
testData.forEach((dataPoint, index) => {
it(`${index}. ${integration} - ${dataPoint.description}`, async () => {
try {
let output = await transformer.process(dataPoint.input);
delete output.body.JSON.idempotency;
const output = await transformer.process(dataPoint.input);
expect(output).toEqual(dataPoint.output);
} catch (error) {
expect(error.message).toEqual(dataPoint.output.error);
}
});
});
});

describe("Router Tests", () => {
it("Payload", async () => {
const routerOutput = await transformer.processRouterDest(inputRouterData);
Expand Down
Loading
Loading