Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for vdm next to fb custom audiences #3729

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ type Destination = {
IsConnectionEnabled?: boolean;
};

type Connection = {
sourceId: string;
destinationId: string;
enabled: boolean;
config: Record<string, unknown>;
processorEnabled?: boolean;
koladilip marked this conversation as resolved.
Show resolved Hide resolved
};

type UserTransformationLibrary = {
VersionID: string;
};
Expand All @@ -151,6 +159,7 @@ type ProcessorTransformationRequest = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
libraries?: UserTransformationLibrary[];
credentials?: Credential[];
};
Expand All @@ -160,6 +169,7 @@ type RouterTransformationRequestData = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
};

type RouterTransformationRequest = {
Expand Down Expand Up @@ -350,6 +360,7 @@ export {
DeliveryJobState,
DeliveryV0Response,
DeliveryV1Response,
Connection,
Destination,
ErrorDetailer,
MessageIdMetadataMap,
Expand Down
6 changes: 4 additions & 2 deletions src/v0/destinations/fb_custom_audience/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const subTypeFields = [
'CONTACT_IMPORTER',
'DATA_FILE',
];
// as per real time experimentation maximum 500 users can be added at a time
// const MAX_USER_COUNT = 500; (using from destination definition)

const USER_ADD = 'add';
const USER_DELETE = 'remove';
// https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences/
const MAX_USER_COUNT = 10000;
/* No official Documentation is available for this but using trial
and error method we found that 65000 bytes is the maximum payload allowed size but we are 60000 just to be sure batching is done properly
*/
Expand All @@ -102,6 +103,7 @@ module.exports = {
schemaFields,
USER_ADD,
USER_DELETE,
MAX_USER_COUNT,
typeFields,
subTypeFields,
maxPayloadSize,
Expand Down
152 changes: 103 additions & 49 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const get = require('get-value');
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const {
InstrumentationError,
ConfigurationError,
PlatformError,
} = require('@rudderstack/integrations-lib');
const { schemaFields, MAX_USER_COUNT } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const stats = require('../../../util/stats');
const {
Expand All @@ -11,6 +15,8 @@ const {
checkSubsetOfArray,
returnArrayOfSubarrays,
getSuccessRespEvents,
isEventSentByVDMV2Flow,
isEventSentByVDMV1Flow,
} = require('../../util');
const { getErrorResponse, createFinalResponse } = require('../../util/recordUtils');
const {
Expand All @@ -20,6 +26,7 @@ const {
batchingWithPayloadSize,
responseBuilderSimple,
getDataSource,
generateAppSecretProof,
} = require('./util');

const processRecordEventArray = (
Expand All @@ -31,7 +38,7 @@ const processRecordEventArray = (
prepareParams,
destination,
operation,
operationAudienceId,
audienceId,
) => {
const toSendEvents = [];
const metadata = [];
Expand Down Expand Up @@ -88,7 +95,7 @@ const processRecordEventArray = (
operationCategory: operation,
};

const builtResponse = responseBuilderSimple(wrappedResponse, operationAudienceId);
const builtResponse = responseBuilderSimple(wrappedResponse, audienceId);

toSendEvents.push(builtResponse);
});
Expand All @@ -99,49 +106,26 @@ const processRecordEventArray = (
return response;
};

async function processRecordInputs(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const {
isHashRequired,
accessToken,
disableFormat,
type,
subType,
isRaw,
maxUserCount,
audienceId,
} = destination.Config;
function preparePayload(events, config) {
const { audienceId, userSchema, isRaw, type, subType, isHashRequired, disableFormat } = config;
const { destination } = events[0];
const { accessToken, appSecret } = destination.Config;
const prepareParams = {
access_token: accessToken,
};

// maxUserCount validation
const maxUserCountNumber = parseInt(maxUserCount, 10);
if (Number.isNaN(maxUserCountNumber)) {
throw new ConfigurationError('Batch size must be an Integer.');
if (isDefinedAndNotNullAndNotEmpty(appSecret)) {
const dateNow = Date.now();
prepareParams.appsecret_time = Math.floor(dateNow / 1000); // Get current Unix time in seconds
prepareParams.appsecret_proof = generateAppSecretProof(accessToken, appSecret, dateNow);
}

// audience id validation
let operationAudienceId = audienceId;
const mappedToDestination = get(message, MappedToDestinationKey);
if (mappedToDestination) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}
if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
const cleanUserSchema = userSchema.map((field) => field.trim());

// user schema validation
let { userSchema } = destination.Config;
if (mappedToDestination) {
userSchema = getSchemaForEventMappedToDest(message);
}
if (!Array.isArray(userSchema)) {
userSchema = [userSchema];
if (!isDefinedAndNotNullAndNotEmpty(audienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
if (!checkSubsetOfArray(schemaFields, userSchema)) {
if (!checkSubsetOfArray(schemaFields, cleanUserSchema)) {
throw new ConfigurationError('One or more of the schema fields are not supported');
}

Expand All @@ -156,7 +140,7 @@ async function processRecordInputs(groupedRecordInputs) {
paramsPayload.data_source = dataSource;
}

const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) =>
const groupedRecordsByAction = lodash.groupBy(events, (record) =>
record.message.action?.toLowerCase(),
);

Expand All @@ -167,55 +151,55 @@ async function processRecordInputs(groupedRecordInputs) {
if (groupedRecordsByAction.delete) {
const deleteRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.delete,
maxUserCountNumber,
MAX_USER_COUNT,
);
deleteResponse = processRecordEventArray(
deleteRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'remove',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.insert) {
const insertRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.insert,
maxUserCountNumber,
MAX_USER_COUNT,
);

insertResponse = processRecordEventArray(
insertRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.update) {
const updateRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.update,
maxUserCountNumber,
MAX_USER_COUNT,
);
updateResponse = processRecordEventArray(
updateRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

Expand All @@ -225,6 +209,7 @@ async function processRecordInputs(groupedRecordInputs) {
deleteResponse,
insertResponse,
updateResponse,

errorResponse,
);
if (finalResponse.length === 0) {
Expand All @@ -235,6 +220,75 @@ async function processRecordInputs(groupedRecordInputs) {
return finalResponse;
}

async function processRecordInputsV1(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId, userSchema } =
destination.Config;

// mapped to destination
const mappedToDestination = get(message, MappedToDestinationKey);
let operationAudienceId = audienceId;
if (mappedToDestination) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}

// user schema override
let updatedUserSchema = userSchema;
if (mappedToDestination) {
updatedUserSchema = getSchemaForEventMappedToDest(message);
}

return preparePayload(groupedRecordInputs, {
audienceId: operationAudienceId,
userSchema: updatedUserSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
}

const processRecordInputsV2 = async (groupedRecordInputs) => {
const { connection, message } = groupedRecordInputs[0];
koladilip marked this conversation as resolved.
Show resolved Hide resolved
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId } =
connection.config.destination;
const identifiers = message?.identifiers;
let userSchema;
if (identifiers) {
userSchema = Object.keys(identifiers);
krishna2020 marked this conversation as resolved.
Show resolved Hide resolved
}
const events = groupedRecordInputs.map((record) => ({
...record,
message: {
...record.message,
fields: record.message.identifiers,
},
}));
return preparePayload(events, {
audienceId,
userSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
};

async function processRecordInputs(groupedRecordInputs) {
const event = groupedRecordInputs[0];
if (isEventSentByVDMV1Flow(event)) {
return processRecordInputsV1(groupedRecordInputs);
}
if (isEventSentByVDMV2Flow(event)) {
koladilip marked this conversation as resolved.
Show resolved Hide resolved
return processRecordInputsV2(groupedRecordInputs);
}
throw new PlatformError('unsupported record event format');
}

module.exports = {
processRecordInputs,
};
14 changes: 5 additions & 9 deletions src/v0/destinations/fb_custom_audience/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {
responseBuilderSimple,
getDataSource,
} = require('./util');
const { schemaFields, USER_ADD, USER_DELETE } = require('./config');
const { schemaFields, USER_ADD, USER_DELETE, MAX_USER_COUNT } = require('./config');

const { MappedToDestinationKey } = require('../../../constants');
const { processRecordInputs } = require('./recordTransform');
Expand Down Expand Up @@ -158,15 +158,11 @@ const processEvent = (message, destination) => {
const respList = [];
let toSendEvents = [];
let { userSchema } = destination.Config;
const { isHashRequired, audienceId, maxUserCount } = destination.Config;
const { isHashRequired, audienceId } = destination.Config;
if (!message.type) {
throw new InstrumentationError('Message Type is not present. Aborting message.');
}
const maxUserCountNumber = parseInt(maxUserCount, 10);

if (Number.isNaN(maxUserCountNumber)) {
throw new ConfigurationError('Batch size must be an Integer.');
}
if (message.type.toLowerCase() !== 'audiencelist') {
throw new InstrumentationError(` ${message.type} call is not supported `);
}
Expand Down Expand Up @@ -198,7 +194,7 @@ const processEvent = (message, destination) => {

// when "remove" is present in the payload
if (isDefinedAndNotNullAndNotEmpty(listData[USER_DELETE])) {
const audienceChunksArray = returnArrayOfSubarrays(listData[USER_DELETE], maxUserCountNumber);
const audienceChunksArray = returnArrayOfSubarrays(listData[USER_DELETE], MAX_USER_COUNT);
toSendEvents = prepareToSendEvents(
message,
destination,
Expand All @@ -211,7 +207,7 @@ const processEvent = (message, destination) => {

// When "add" is present in the payload
if (isDefinedAndNotNullAndNotEmpty(listData[USER_ADD])) {
const audienceChunksArray = returnArrayOfSubarrays(listData[USER_ADD], maxUserCountNumber);
const audienceChunksArray = returnArrayOfSubarrays(listData[USER_ADD], MAX_USER_COUNT);
toSendEvents.push(
...prepareToSendEvents(
message,
Expand Down Expand Up @@ -252,7 +248,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
}

if (groupedInputs.record) {
transformedRecordEvent = await processRecordInputs(groupedInputs.record, reqMetadata);
transformedRecordEvent = await processRecordInputs(groupedInputs.record);
}

if (groupedInputs.audiencelist) {
Expand Down
Loading
Loading