Skip to content

Commit

Permalink
feat: onboard Amazon Audience (#3727)
Browse files Browse the repository at this point in the history
  • Loading branch information
anantjain45823 authored Oct 8, 2024
1 parent 6d9976c commit 5ac8186
Show file tree
Hide file tree
Showing 12 changed files with 1,207 additions and 0 deletions.
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"ajv": "^8.12.0",
"ajv-draft-04": "^1.0.0",
"ajv-formats": "^2.1.1",
"amazon-dsp-formatter": "^1.0.2",
"axios": "^1.7.3",
"btoa": "^1.2.1",
"component-each": "^0.2.6",
Expand Down
1 change: 1 addition & 0 deletions src/features.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"BLOOMREACH_CATALOG": true,
"SMARTLY": true,
"HTTP": true,
"AMAZON_AUDIENCE": true,
"INTERCOM_V2": true
},
"regulations": [
Expand Down
5 changes: 5 additions & 0 deletions src/v0/destinations/amazon_audience/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const CREATE_USERS_URL = 'https://advertising-api.amazon.com/dp/records/hashed/';
const ASSOCIATE_USERS_URL = 'https://advertising-api.amazon.com/v2/dp/audience';
const MAX_PAYLOAD_SIZE_IN_BYTES = 4000000;
const DESTINATION = 'amazon_audience';
module.exports = { CREATE_USERS_URL, MAX_PAYLOAD_SIZE_IN_BYTES, ASSOCIATE_USERS_URL, DESTINATION };
139 changes: 139 additions & 0 deletions src/v0/destinations/amazon_audience/networkHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
const {
NetworkError,
ThrottledError,
AbortedError,
RetryableError,
} = require('@rudderstack/integrations-lib');
const { prepareProxyRequest, handleHttpRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess } = require('../../util/index');
const {
processAxiosResponse,
getDynamicErrorType,
} = require('../../../adapters/utils/networkUtils');
const { REFRESH_TOKEN } = require('../../../adapters/networkhandler/authConstants');
const { DESTINATION, CREATE_USERS_URL, ASSOCIATE_USERS_URL } = require('./config');
const { TAG_NAMES } = require('../../util/tags');

const amazonAudienceRespHandler = (destResponse, stageMsg) => {
const { status, response } = destResponse;

// to handle the case when authorization-token is invalid
// docs for error codes: https://advertising.amazon.com/API/docs/en-us/reference/concepts/errors#tag/Audiences/operation/dspCreateAudiencesPost
if (status === 401 && response.message === 'Unauthorized') {
// 401 takes place in case of authorization isue meaning token is epxired or access is not enough.
// Since acces is configured from dashboard only refresh token makes sense
throw new NetworkError(
`${response?.message} ${stageMsg}`,
status,
{
[TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status),
},
response,
REFRESH_TOKEN,
);
} else if (status === 429) {
throw new ThrottledError(
`Request Failed: ${stageMsg} - due to Request Limit exceeded, (Throttled)`,
destResponse,
);
} else if (status === 504 || status === 502 || status === 500) {
// see if its 5xx internal error
throw new RetryableError(`Request Failed: ${stageMsg} (Retryable)`, 500, destResponse);
}
// else throw the error
throw new AbortedError(
`Request Failed: ${stageMsg} with status "${status}" due to "${JSON.stringify(
response,
)}", (Aborted) `,
400,
destResponse,
);
};

const responseHandler = (responseParams) => {
const { destinationResponse } = responseParams;
const message = `[${DESTINATION} Response Handler] - Request Processed Successfully`;
const { status } = destinationResponse;

if (!isHttpStatusSuccess(status)) {
// if error, successfully return status, message and original destination response
amazonAudienceRespHandler(
destinationResponse,
'during amazon_audience response transformation',
);
}
return {
status,
message,
destinationResponse,
};
};

const makeRequest = async (url, data, headers, metadata, method, args) => {
const { httpResponse } = await handleHttpRequest(method, url, data, { headers }, args);
return httpResponse;
};

const amazonAudienceProxyRequest = async (request) => {
const { body, metadata } = request;
const { headers } = request;
const { createUsers, associateUsers } = body.JSON;

// step 1: Create users
const firstResponse = await makeRequest(
CREATE_USERS_URL,
createUsers,
headers,
metadata,
'post',
{
destType: 'amazon_audience',
feature: 'proxy',
requestMethod: 'POST',
module: 'dataDelivery',
endpointPath: '/records/hashed',
metadata,
},
);
// validate response success
if (!firstResponse.success && !isHttpStatusSuccess(firstResponse?.response?.status)) {
amazonAudienceRespHandler(
{
response: firstResponse.response?.response?.data || firstResponse,
status: firstResponse.response?.response?.status || firstResponse,
},
'during creating users',
);
}
// we are returning above in case of failure because if first step is not executed then there is no sense of executing second step
// step2: Associate Users to Audience Id
const secondResponse = await makeRequest(
ASSOCIATE_USERS_URL,
associateUsers,
headers,
metadata,
'patch',
{
destType: 'amazon_audience',
feature: 'proxy',
requestMethod: 'PATCH',
module: 'dataDelivery',
endpointPath: '/v2/dp/audience',
metadata,
},
);
return secondResponse;
};
// eslint-disable-next-line @typescript-eslint/naming-convention
class networkHandler {
constructor() {
this.responseHandler = responseHandler;
this.proxy = amazonAudienceProxyRequest;
this.prepareProxy = prepareProxyRequest;
this.processAxiosResponse = processAxiosResponse;
}
}

module.exports = {
networkHandler,
};
58 changes: 58 additions & 0 deletions src/v0/destinations/amazon_audience/transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/* eslint-disable @typescript-eslint/naming-convention */
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { handleRtTfSingleEventError } = require('../../util');
const { batchEvents, buildResponseWithUsers, getUserDetails } = require('./utils');
/**
* This function returns the user traits list required in request for
* making a call to create a group of users in amazon_audience
* @param {*} record
* @param {*} destination
* @param {*} metadata
*/
const processRecord = (record, config) => {
const { fields, action, type } = record;
if (type !== 'record') {
throw new InstrumentationError(`[AMAZON AUDIENCE]: ${type} is not supported`);
}
return { user: getUserDetails(fields, config), action: action !== 'delete' ? 'add' : 'remove' };
};

// This function is used to process a single record
const process = (event) => {
const { message, destination, metadata } = event;
const { Config } = destination;
const { user, action } = processRecord(message, Config);
return buildResponseWithUsers([user], action, Config, [metadata.jobId], metadata.secret);
};
// This function is used to process multiple records
const processRouterDest = async (inputs, reqMetadata) => {
const responseList = []; // list containing all successful responses
const errorRespList = []; // list of error
const { destination } = inputs[0];
const { Config } = destination;
inputs.map(async (event) => {
try {
if (event.message.statusCode) {
// already transformed event
responseList.push(event);
} else {
// if not transformed
responseList.push({
message: processRecord(event.message, Config),
metadata: event.metadata,
destination,
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
errorRespList.push(errRespEvent);
}
});
let batchedResponseList = [];
if (responseList.length > 0) {
batchedResponseList = batchEvents(responseList, destination);
}
return [...batchedResponseList, ...errorRespList];
};

module.exports = { process, processRouterDest };
Loading

0 comments on commit 5ac8186

Please sign in to comment.