diff --git a/src/v0/destinations/x_audience/config.js b/src/v0/destinations/x_audience/config.js index 10c52bc638..37b35c00b7 100644 --- a/src/v0/destinations/x_audience/config.js +++ b/src/v0/destinations/x_audience/config.js @@ -1,5 +1,5 @@ const BASE_URL = - 'https://ads-api.twitter.com/12/accounts/:account_id/custom_audiences/:custom_audience_id/users'; + 'https://ads-api.twitter.com/12/accounts/:account_id/custom_audiences/:audience_id/users'; const MAX_PAYLOAD_SIZE_IN_BYTES = 4000000; const MAX_OPERATIONS = 2500; module.exports = { BASE_URL, MAX_PAYLOAD_SIZE_IN_BYTES, MAX_OPERATIONS }; diff --git a/src/v0/destinations/x_audience/transform.js b/src/v0/destinations/x_audience/transform.js index 4add82238d..5ee73b108c 100644 --- a/src/v0/destinations/x_audience/transform.js +++ b/src/v0/destinations/x_audience/transform.js @@ -1,39 +1,10 @@ /* eslint-disable @typescript-eslint/naming-convention */ const { removeUndefinedAndNullAndEmptyValues, - handleRtTfSingleEventError, InstrumentationError, } = require('@rudderstack/integrations-lib'); -const { getAuthHeaderForRequest } = require('../twitter_ads/util'); -const { defaultRequestConfig, defaultPostRequestConfig } = require('../../util'); -const { BASE_URL } = require('./config'); -const { JSON_MIME_TYPE } = require('../../util/constant'); -const { getOAuthFields, batchEvents, getUserDetails } = require('./utils'); - -// Docs: https://developer.x.com/en/docs/x-ads-api/audiences/api-reference/custom-audience-user -const buildResponseWithJSON = (JSON, config, metadata) => { - const response = defaultRequestConfig(); - response.endpoint = BASE_URL.replace(':account_id', config.accountId).replace( - ':custom_audience_id', - config.audienceId, - ); - response.method = defaultPostRequestConfig.requestMethod; - response.body.JSON = JSON; - // required to be in accordance with oauth package - const request = { - url: response.endpoint, - method: response.method, - body: response.body.JSON, - }; - - const oAuthObject = getOAuthFields(metadata); - const authHeader = getAuthHeaderForRequest(request, oAuthObject).Authorization; - response.headers = { - Authorization: authHeader, - 'Content-Type': JSON_MIME_TYPE, - }; - return response; -}; +const { handleRtTfSingleEventError } = require('../../util'); +const { batchEvents, buildResponseWithJSON, getUserDetails } = require('./utils'); /** * This function returns audience object in the form of destination API * @param {*} message @@ -41,7 +12,10 @@ const buildResponseWithJSON = (JSON, config, metadata) => { * @param {*} metadata */ const processRecordEvent = (message, config) => { - const { fields, action } = message; + const { fields, action, type } = message; + if (type !== 'record') { + throw new InstrumentationError(`[X AUDIENCE]: ${type} is not supported`); + } const { effective_at, expires_at } = fields; const users = [getUserDetails(fields, config)]; @@ -57,10 +31,6 @@ const processRecordEvent = (message, config) => { const process = (event) => { const { message, destination, metadata } = event; const { config } = destination; - - if (message.type !== 'record') { - throw new InstrumentationError(`[X AUDIENCE]: ${message.type} is not supported`); - } const payload = [processRecordEvent(message, config)]; return buildResponseWithJSON(payload, config, metadata); }; @@ -86,12 +56,9 @@ const processRouterDest = async (inputs, reqMetadata) => { errorRespList.push(errRespEvent); } }); - const batchedResponseList = []; + let batchedResponseList = []; if (responseList.length > 0) { - const batchedEvents = batchEvents(responseList, destination); - batchedEvents.forEach((batch) => { - batchedResponseList.push(buildResponseWithJSON(batch)); - }); + batchedResponseList = batchEvents(responseList, destination); } return [...batchedResponseList, ...errorRespList]; }; diff --git a/src/v0/destinations/x_audience/utils.js b/src/v0/destinations/x_audience/utils.js index c7fef72e46..691005442f 100644 --- a/src/v0/destinations/x_audience/utils.js +++ b/src/v0/destinations/x_audience/utils.js @@ -3,13 +3,19 @@ const sha256 = require('sha256'); const lodash = require('lodash'); const jsonSize = require('json-size'); const { OAuthSecretError } = require('@rudderstack/integrations-lib'); -const { getSuccessRespEvents, removeUndefinedAndNullAndEmptyValues } = require('../../util'); -const { MAX_PAYLOAD_SIZE_IN_BYTES, MAX_OPERATIONS } = require('./config'); -const { buildResponseWithJSON } = require('./transform'); +const { + defaultRequestConfig, + defaultPostRequestConfig, + getSuccessRespEvents, + removeUndefinedAndNullAndEmptyValues, +} = require('../../util'); +const { MAX_PAYLOAD_SIZE_IN_BYTES, BASE_URL, MAX_OPERATIONS } = require('./config'); +const { getAuthHeaderForRequest } = require('../twitter_ads/util'); +const { JSON_MIME_TYPE } = require('../../util/constant'); const getOAuthFields = ({ secret }) => { if (!secret) { - throw new OAuthSecretError('[TWITTER ADS]:: OAuth - access keys not found'); + throw new OAuthSecretError('[X Audience]:: OAuth - access keys not found'); } const oAuthObject = { consumerKey: secret.consumerKey, @@ -20,6 +26,31 @@ const getOAuthFields = ({ secret }) => { return oAuthObject; }; +// Docs: https://developer.x.com/en/docs/x-ads-api/audiences/api-reference/custom-audience-user +const buildResponseWithJSON = (JSON, config, metadata) => { + const response = defaultRequestConfig(); + response.endpoint = BASE_URL.replace(':account_id', config.accountId).replace( + ':audience_id', + config.audienceId, + ); + response.method = defaultPostRequestConfig.requestMethod; + response.body.JSON = JSON; + // required to be in accordance with oauth package + const request = { + url: response.endpoint, + method: response.method, + body: response.body.JSON, + }; + + const oAuthObject = getOAuthFields(metadata); + const authHeader = getAuthHeaderForRequest(request, oAuthObject).Authorization; + response.headers = { + Authorization: authHeader, + 'Content-Type': JSON_MIME_TYPE, + }; + return response; +}; + /** * This fucntion groups the response list based upoin 3 fields that are * 1. operation_type @@ -50,7 +81,7 @@ const getFinalResponseList = (operationObjectList, destination) => { const { payload, metadataList } = operationObject; metadataWithSecret = { secret: metadataList[0].secret }; if ( - currentBatchedRequest.length > MAX_OPERATIONS || + currentBatchedRequest.length >= MAX_OPERATIONS || jsonSize([...currentBatchedRequest, payload]) > MAX_PAYLOAD_SIZE_IN_BYTES ) { respList.push( @@ -61,7 +92,7 @@ const getFinalResponseList = (operationObjectList, destination) => { true, ), ); - currentBatchedRequest = [operationObject]; + currentBatchedRequest = [payload]; currentMetadataList = metadataList; } else { currentBatchedRequest.push(payload); @@ -91,41 +122,41 @@ const getFinalResponseList = (operationObjectList, destination) => { const getOperationObjectList = (eventGroups) => { const operationList = []; Object.keys(eventGroups).forEach((group) => { - const { operation, params } = group[0].message; + const { operation_type, params } = eventGroups[group][0].message; const { effective_at, expires_at } = params; let currentUserList = []; let currentMetadata = []; - group.forEach((event) => { + eventGroups[group].forEach((event) => { const newUsers = event.message.params.users; // calculating size before appending the user and metadata list - if (jsonSize([...currentUserList, ...newUsers]).length > MAX_PAYLOAD_SIZE_IN_BYTES) { + if (jsonSize([...currentUserList, ...newUsers]) < MAX_PAYLOAD_SIZE_IN_BYTES) { currentUserList.push(...event.message.params.users); currentMetadata.push(event.metadata); } else { operationList.push({ payload: { - operation_type: operation, - params: { + operation_type, + params: removeUndefinedAndNullAndEmptyValues({ effective_at, expires_at, users: currentUserList, - }, + }), }, metadataList: currentMetadata, }); + currentUserList = event.message.params.users; + currentMetadata = event.metadata; } - currentUserList = event.message.params.users; - currentMetadata = event.metadata; }); // all the remaining user and metadata list used in one list operationList.push({ payload: { - operation_type: operation, - params: { + operation_type, + params: removeUndefinedAndNullAndEmptyValues({ effective_at, expires_at, users: currentUserList, - }, + }), }, metadataList: currentMetadata, }); @@ -200,4 +231,4 @@ const getUserDetails = (fields, config) => { } return removeUndefinedAndNullAndEmptyValues(user); }; -module.exports = { getOAuthFields, batchEvents, getUserDetails }; +module.exports = { getOAuthFields, batchEvents, getUserDetails, buildResponseWithJSON }; diff --git a/test/integrations/destinations/x_audience/common.ts b/test/integrations/destinations/x_audience/common.ts new file mode 100644 index 0000000000..acaf46a889 --- /dev/null +++ b/test/integrations/destinations/x_audience/common.ts @@ -0,0 +1,28 @@ +export const authHeaderConstant = + '"OAuth oauth_consumer_key="validConsumerKey", oauth_nonce="j8kZvaJQRTaLX8h460CgHNs6rCEArNOW", oauth_signature="uAu%2FGdA6qPGW88pjVd7%2FgnAlHtM%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1725014809", oauth_token="validAccessToken", oauth_version="1.0"'; + +export const destination = { + config: { + accountId: '1234', + audienceId: 'dummyId', + }, + ID: 'xpixel-1234', +}; + +export const generateMetadata = (jobId: number, userId?: string): any => { + return { + jobId, + attemptNum: 1, + userId: userId || 'default-userId', + sourceId: 'default-sourceId', + destinationId: 'default-destinationId', + workspaceId: 'default-workspaceId', + secret: { + consumerKey: 'validConsumerKey', + consumerSecret: 'validConsumerSecret', + accessToken: 'validAccessToken', + accessTokenSecret: 'validAccessTokenSecret', + }, + dontBatch: false, + }; +}; diff --git a/test/integrations/destinations/x_audience/processor/data.ts b/test/integrations/destinations/x_audience/processor/data.ts index 69f7928af3..6cf8aad98a 100644 --- a/test/integrations/destinations/x_audience/processor/data.ts +++ b/test/integrations/destinations/x_audience/processor/data.ts @@ -1,35 +1,13 @@ -import { getOAuthFields, batchEvents } from '../../../../../src/v0/destinations/x_audience/utils'; +import { destination, authHeaderConstant, generateMetadata } from '../common'; -const authHeaderConstant = - '"OAuth oauth_consumer_key="validConsumerKey", oauth_nonce="j8kZvaJQRTaLX8h460CgHNs6rCEArNOW", oauth_signature="uAu%2FGdA6qPGW88pjVd7%2FgnAlHtM%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1725014809", oauth_token="validAccessToken", oauth_version="1.0"'; - -const destination = { - config: { - username: 'wootricfakeuser@example.com', - password: 'password@123', - accountToken: 'NPS-dummyToken12', - accountId: '1234', - audienceId: 'dummyId', - }, - ID: 'wootric-1234', -}; -const generateMetadata = (jobId: number, userId?: string): any => { - return { - jobId, - attemptNum: 1, - userId: userId || 'default-userId', - sourceId: 'default-sourceId', - destinationId: 'default-destinationId', - workspaceId: 'default-workspaceId', - secret: { - consumerKey: 'validConsumerKey', - consumerSecret: 'validConsumerSecret', - accessToken: 'validAccessToken', - accessTokenSecret: 'validAccessTokenSecret', - }, - dontBatch: false, - }; +const fields = { + email: 'abc@xyz.com,a+1@xyz.com', + phone_number: '98765433232,21323', + handle: '@abc,@xyz', + twitter_id: 'tid1,tid2', + partner_user_id: 'puid1,puid2', }; + export const data = [ { name: 'x_audience', @@ -47,16 +25,11 @@ export const data = [ type: 'record', action: 'insert', fields: { - email: 'abc@xyz.com,a+1@xyz.com', - phone_number: '98765433232,21323', - handle: '@abc,@xyz', + ...fields, device_id: 'did123,did456', - twitter_id: 'tid1,tid2', - partner_user_id: 'puid1,puid2', effective_at: '2024-05-15T00:00:00Z', expires_at: '2025-05-15T00:00:00Z', }, - channel: 'sources', context: {}, recordId: '1', }, @@ -145,13 +118,7 @@ export const data = [ message: { type: 'record', action: 'delete', - fields: { - email: 'abc@xyz.com,a+1@xyz.com', - phone_number: '98765433232,21323', - handle: '@abc,@xyz', - twitter_id: 'tid1,tid2', - partner_user_id: 'puid1,puid2', - }, + fields, channel: 'sources', context: {}, recordId: '1', @@ -219,7 +186,7 @@ export const data = [ request: { body: [ { - destination: { ...destination, config: { ...destination.config, enableHash: true } }, + destination, message: { type: 'identify', context: {}, diff --git a/test/integrations/destinations/x_audience/router/data.ts b/test/integrations/destinations/x_audience/router/data.ts new file mode 100644 index 0000000000..a4db348219 --- /dev/null +++ b/test/integrations/destinations/x_audience/router/data.ts @@ -0,0 +1,231 @@ +import { destination, authHeaderConstant, generateMetadata } from '../common'; + +export const data = [ + { + name: 'x_audience', + id: 'router-test-1', + description: + 'case with 2 record with no effective and expire at date with insert operations, 4 insert with 2 each having same effective and expire at and one delete and one failure event', + feature: 'router', + module: 'destination', + version: 'v0', + input: { + request: { + body: { + input: [ + { + destination, + message: { + type: 'record', + action: 'delete', + fields: { email: 'email1@abc.com' }, + channel: 'sources', + context: {}, + recordId: '1', + }, + metadata: generateMetadata(1), + }, + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { email: 'email2@abc.com' }, + channel: 'sources', + context: {}, + recordId: '2', + }, + metadata: generateMetadata(2), + }, + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { email: 'email3@abc.com' }, + channel: 'sources', + context: {}, + recordId: '3', + }, + metadata: generateMetadata(3), + }, + { + destination, + message: { + type: 'record', + action: 'update', + fields: { + email: 'email4@abc.com', + expires_at: 'some date', + effective_at: 'some effective date', + }, + channel: 'sources', + context: {}, + recordId: '4', + }, + metadata: generateMetadata(4), + }, + { + destination, + message: { + type: 'record', + action: 'update', + fields: { + email: 'email5@abc.com', + expires_at: 'some date', + effective_at: 'some effective date', + }, + channel: 'sources', + context: {}, + recordId: '5', + }, + metadata: generateMetadata(5), + }, + { + destination, + message: { + type: 'identify', + context: {}, + recordId: '1', + }, + metadata: generateMetadata(6), + }, + ], + destType: 'x_audience', + }, + method: 'POST', + }, + }, + output: { + response: { + status: 200, + body: { + output: [ + { + batched: true, + batchedRequest: { + version: '1', + type: 'REST', + method: 'POST', + endpoint: + 'https://ads-api.twitter.com/12/accounts/1234/custom_audiences/dummyId/users', + headers: { + Authorization: authHeaderConstant, + 'Content-Type': 'application/json', + }, + params: {}, + body: { + JSON: [ + { + operation_type: 'Delete', + params: { + users: [ + { + email: ['email1@abc.com'], + }, + ], + }, + }, + { + operation_type: 'Update', + params: { + users: [ + { + email: ['email2@abc.com'], + }, + { + email: ['email3@abc.com'], + }, + ], + }, + }, + ], + JSON_ARRAY: {}, + XML: {}, + FORM: {}, + }, + files: {}, + }, + destination, + metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)], + statusCode: 200, + }, + { + batched: true, + batchedRequest: { + version: '1', + type: 'REST', + method: 'POST', + endpoint: + 'https://ads-api.twitter.com/12/accounts/1234/custom_audiences/dummyId/users', + headers: { + Authorization: authHeaderConstant, + 'Content-Type': 'application/json', + }, + params: {}, + body: { + JSON: [ + { + operation_type: 'Update', + params: { + expires_at: 'some date', + effective_at: 'some effective date', + users: [ + { + email: ['email4@abc.com'], + }, + { + email: ['email5@abc.com'], + }, + ], + }, + }, + ], + JSON_ARRAY: {}, + XML: {}, + FORM: {}, + }, + files: {}, + }, + destination, + metadata: [generateMetadata(4), generateMetadata(5)], + statusCode: 200, + }, + { + metadata: [generateMetadata(6)], + destination, + batched: false, + statusCode: 400, + error: '[X AUDIENCE]: identify is not supported', + statTags: { + errorCategory: 'dataValidation', + destinationId: 'default-destinationId', + errorType: 'instrumentation', + destType: 'X_AUDIENCE', + module: 'destination', + implementation: 'native', + workspaceId: 'default-workspaceId', + feature: 'router', + }, + }, + ], + }, + }, + }, + }, +].map((tc) => ({ + ...tc, + mockFns: (_) => { + jest.mock('../../../../../src/v0/destinations/twitter_ads/util', () => ({ + getAuthHeaderForRequest: (_a, _b) => { + return { Authorization: authHeaderConstant }; + }, + })); + jest.mock('../../../../../src/v0/destinations/x_audience/config', () => ({ + BASE_URL: + 'https://ads-api.twitter.com/12/accounts/:account_id/custom_audiences/:audience_id/users', + MAX_PAYLOAD_SIZE_IN_BYTES: 40000, + MAX_OPERATIONS: 2, + })); + }, +}));