Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: onboard X(Twiiter) Audience #3696

Merged
merged 9 commits into from
Sep 10, 2024
5 changes: 5 additions & 0 deletions src/v0/destinations/x_audience/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const BASE_URL =
'https://ads-api.twitter.com/12/accounts/:account_id/custom_audiences/:audience_id/users';
const MAX_PAYLOAD_SIZE_IN_BYTES = 4000000;
const MAX_OPERATIONS = 2500;
module.exports = { BASE_URL, MAX_PAYLOAD_SIZE_IN_BYTES, MAX_OPERATIONS };
66 changes: 66 additions & 0 deletions src/v0/destinations/x_audience/transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/* eslint-disable @typescript-eslint/naming-convention */
const {
removeUndefinedAndNullAndEmptyValues,
InstrumentationError,
} = require('@rudderstack/integrations-lib');
const { handleRtTfSingleEventError } = require('../../util');
const { batchEvents, buildResponseWithJSON, getUserDetails } = require('./utils');
/**
* This function returns audience object in the form of destination API
* @param {*} message
* @param {*} destination
* @param {*} metadata
*/
const processRecordEvent = (message, config) => {
const { fields, action, type } = message;
if (type !== 'record') {
throw new InstrumentationError(`[X AUDIENCE]: ${type} is not supported`);
}
const { effective_at, expires_at } = fields;
anantjain45823 marked this conversation as resolved.
Show resolved Hide resolved
const users = [getUserDetails(fields, config)];

return {
operation_type: action !== 'delete' ? 'Update' : 'Delete',
params: removeUndefinedAndNullAndEmptyValues({
effective_at,
expires_at,
users,
}),
};
};
const process = (event) => {
const { message, destination, metadata } = event;
const { config } = destination;
const payload = [processRecordEvent(message, config)];
return buildResponseWithJSON(payload, config, metadata);
};
const processRouterDest = async (inputs, reqMetadata) => {
const responseList = []; // list containing single track event payload
const errorRespList = []; // list of error
const { destination } = inputs[0];
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
responseList.push(event);

Check warning on line 45 in src/v0/destinations/x_audience/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/x_audience/transform.js#L45

Added line #L45 was not covered by tests
} else {
// if not transformed
responseList.push({
message: processRecordEvent(event.message, destination?.config),
metadata: event.metadata,
destination,
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
errorRespList.push(errRespEvent);
}
});
let batchedResponseList = [];
if (responseList.length > 0) {
batchedResponseList = batchEvents(responseList, destination);
}
return [...batchedResponseList, ...errorRespList];
};

module.exports = { process, processRouterDest, buildResponseWithJSON };
234 changes: 234 additions & 0 deletions src/v0/destinations/x_audience/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/* eslint-disable @typescript-eslint/naming-convention */
const sha256 = require('sha256');
const lodash = require('lodash');
const jsonSize = require('json-size');
const { OAuthSecretError } = require('@rudderstack/integrations-lib');
const {
defaultRequestConfig,
defaultPostRequestConfig,
getSuccessRespEvents,
removeUndefinedAndNullAndEmptyValues,
} = require('../../util');
const { MAX_PAYLOAD_SIZE_IN_BYTES, BASE_URL, MAX_OPERATIONS } = require('./config');
const { getAuthHeaderForRequest } = require('../twitter_ads/util');
const { JSON_MIME_TYPE } = require('../../util/constant');

const getOAuthFields = ({ secret }) => {
if (!secret) {
throw new OAuthSecretError('[X Audience]:: OAuth - access keys not found');

Check warning on line 18 in src/v0/destinations/x_audience/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/x_audience/utils.js#L18

Added line #L18 was not covered by tests
}
const oAuthObject = {
consumerKey: secret.consumerKey,
consumerSecret: secret.consumerSecret,
accessToken: secret.accessToken,
accessTokenSecret: secret.accessTokenSecret,
};
return oAuthObject;
};

// Docs: https://developer.x.com/en/docs/x-ads-api/audiences/api-reference/custom-audience-user
const buildResponseWithJSON = (JSON, config, metadata) => {
const response = defaultRequestConfig();
response.endpoint = BASE_URL.replace(':account_id', config.accountId).replace(
':audience_id',
config.audienceId,
);
response.method = defaultPostRequestConfig.requestMethod;
response.body.JSON = JSON;
// required to be in accordance with oauth package
const request = {
url: response.endpoint,
method: response.method,
body: response.body.JSON,
};

const oAuthObject = getOAuthFields(metadata);
const authHeader = getAuthHeaderForRequest(request, oAuthObject).Authorization;
response.headers = {
Authorization: authHeader,
'Content-Type': JSON_MIME_TYPE,
};
return response;
};

/**
* This fucntion groups the response list based upoin 3 fields that are
* 1. operation_type
* 2. effective_at
* 3. expires_at
* @param {*} respList
* @returns object
*/
const groupResponsesUsingOperationAndTime = (respList) => {
const eventGroups = lodash.groupBy(respList, (item) => [
item.message.operation_type,
item.message.params.effective_at,
item.message.params.expires_at,
]);
return eventGroups;
};
/**
* This function groups the operation object list based upon max sized or batch size allowed
* and returns the final batched request
* @param {*} operationObjectList
*/
const getFinalResponseList = (operationObjectList, destination) => {
const respList = [];
let currentMetadataList = [];
let currentBatchedRequest = [];
let metadataWithSecret; // used for authentication purposes
operationObjectList.forEach((operationObject) => {
const { payload, metadataList } = operationObject;
metadataWithSecret = { secret: metadataList[0].secret };
if (
currentBatchedRequest.length >= MAX_OPERATIONS ||
jsonSize([...currentBatchedRequest, payload]) > MAX_PAYLOAD_SIZE_IN_BYTES
) {
respList.push(
getSuccessRespEvents(
buildResponseWithJSON(currentBatchedRequest, destination.config, metadataWithSecret),
currentMetadataList,
destination,
true,
),
);
currentBatchedRequest = [payload];
currentMetadataList = metadataList;
} else {
currentBatchedRequest.push(payload);
currentMetadataList.push(...metadataList);
}
});
// pushing the remainder operation payloads as well
respList.push(
getSuccessRespEvents(
buildResponseWithJSON(currentBatchedRequest, destination.config, metadataWithSecret),
currentMetadataList,
destination,
true,
),
);
return respList;
};

/**
* This function takes in object containing key as the grouped parameter
* and values as list of all concerned payloads ( having the same key ).
* Then it makes the list of operationObject based upon
* operation and effective and expires time and json size of payload of one object
* @param {*} eventGroups
* @returns
*/
const getOperationObjectList = (eventGroups) => {
const operationList = [];
Object.keys(eventGroups).forEach((group) => {
const { operation_type, params } = eventGroups[group][0].message;
const { effective_at, expires_at } = params;
let currentUserList = [];
let currentMetadata = [];
eventGroups[group].forEach((event) => {
const newUsers = event.message.params.users;
// calculating size before appending the user and metadata list
if (jsonSize([...currentUserList, ...newUsers]) < MAX_PAYLOAD_SIZE_IN_BYTES) {
currentUserList.push(...event.message.params.users);
currentMetadata.push(event.metadata);
} else {
operationList.push({

Check warning on line 136 in src/v0/destinations/x_audience/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/x_audience/utils.js#L135-L136

Added lines #L135 - L136 were not covered by tests
payload: {
operation_type,
params: removeUndefinedAndNullAndEmptyValues({
effective_at,
expires_at,
users: currentUserList,
}),
},
metadataList: currentMetadata,
});
currentUserList = event.message.params.users;
currentMetadata = event.metadata;

Check warning on line 148 in src/v0/destinations/x_audience/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/x_audience/utils.js#L147-L148

Added lines #L147 - L148 were not covered by tests
}
});
// all the remaining user and metadata list used in one list
operationList.push({
payload: {
operation_type,
params: removeUndefinedAndNullAndEmptyValues({
effective_at,
expires_at,
users: currentUserList,
}),
},
metadataList: currentMetadata,
});
});
return operationList;
};

/**
* Input: [{
message: {
operation_type: 'Delete',
params: {
effective_at,
expires_at,
users,
},
},
metadata,
destination,
}]
* @param {*} responseList
*/
const batchEvents = (responseList, destination) => {
const eventGroups = groupResponsesUsingOperationAndTime(responseList);
const operationObjectList = getOperationObjectList(eventGroups);
/* at this point we will a list of json payloads in the following format
operationObjectList = [
{
payload:{
operation_type: 'Delete',
params: {
effective_at,
expires_at,
users,
},
metadata:
[
{jobId:1}, {jobId:2}
]
}
]
*/
return getFinalResponseList(operationObjectList, destination);
};

const getUserDetails = (fields, config) => {
const { enableHash } = config;
const { email, phone_number, handle, device_id, twitter_id, partner_user_id } = fields;
const user = {};
if (email) {
const emailList = email.split(',');
user.email = enableHash ? emailList.map(sha256) : emailList;
}
if (phone_number) {
const phone_numberList = phone_number.split(',');
user.phone_number = enableHash ? phone_numberList.map(sha256) : phone_numberList;
}
if (handle) {
const handleList = handle.split(',');
user.handle = enableHash ? handleList.map(sha256) : handleList;
}
if (device_id) {
const device_idList = device_id.split(',');
user.device_id = enableHash ? device_idList.map(sha256) : device_idList;
}
if (twitter_id) {
const twitter_idList = twitter_id.split(',');
user.twitter_id = enableHash ? twitter_idList.map(sha256) : twitter_idList;
}
if (partner_user_id) {
user.partner_user_id = partner_user_id.split(',');
}
return removeUndefinedAndNullAndEmptyValues(user);
};
module.exports = { getOAuthFields, batchEvents, getUserDetails, buildResponseWithJSON };
28 changes: 28 additions & 0 deletions test/integrations/destinations/x_audience/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export const authHeaderConstant =
'"OAuth oauth_consumer_key="validConsumerKey", oauth_nonce="j8kZvaJQRTaLX8h460CgHNs6rCEArNOW", oauth_signature="uAu%2FGdA6qPGW88pjVd7%2FgnAlHtM%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1725014809", oauth_token="validAccessToken", oauth_version="1.0"';

export const destination = {
config: {
accountId: '1234',
audienceId: 'dummyId',
},
ID: 'xpixel-1234',
};

export const generateMetadata = (jobId: number, userId?: string): any => {
return {
jobId,
attemptNum: 1,
userId: userId || 'default-userId',
sourceId: 'default-sourceId',
destinationId: 'default-destinationId',
workspaceId: 'default-workspaceId',
secret: {
consumerKey: 'validConsumerKey',
consumerSecret: 'validConsumerSecret',
accessToken: 'validAccessToken',
accessTokenSecret: 'validAccessTokenSecret',
},
dontBatch: false,
};
};
Loading
Loading