Skip to content

Commit

Permalink
feat: garl record event support (#3403)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vikas26021999 authored Jun 18, 2024
1 parent 546f2c1 commit 60fee0e
Show file tree
Hide file tree
Showing 11 changed files with 1,329 additions and 473 deletions.
53 changes: 8 additions & 45 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const get = require('get-value');
const {
InstrumentationError,
ConfigurationError,
getErrorRespEvents,
} = require('@rudderstack/integrations-lib');
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const stats = require('../../../util/stats');
Expand All @@ -15,8 +11,8 @@ const {
checkSubsetOfArray,
returnArrayOfSubarrays,
getSuccessRespEvents,
generateErrorObject,
} = require('../../util');
const { getErrorResponse, createFinalResponse } = require('../../util/recordUtils');
const {
ensureApplicableFormat,
getUpdatedDataElement,
Expand All @@ -26,19 +22,6 @@ const {
getDataSource,
} = require('./util');

function getErrorMetaData(inputs, acceptedOperations) {
const metadata = [];
// eslint-disable-next-line no-restricted-syntax
for (const key in inputs) {
if (!acceptedOperations.includes(key)) {
inputs[key].forEach((input) => {
metadata.push(input.metadata);
});
}
}
return metadata;
}

const processRecordEventArray = (
recordChunksArray,
userSchema,
Expand Down Expand Up @@ -177,8 +160,6 @@ async function processRecordInputs(groupedRecordInputs) {
record.message.action?.toLowerCase(),
);

const finalResponse = [];

let insertResponse;
let deleteResponse;
let updateResponse;
Expand Down Expand Up @@ -238,32 +219,14 @@ async function processRecordInputs(groupedRecordInputs) {
);
}

const eventTypes = ['update', 'insert', 'delete'];
const errorMetaData = [];
const errorMetaDataObject = getErrorMetaData(groupedRecordsByAction, eventTypes);
if (errorMetaDataObject.length > 0) {
errorMetaData.push(errorMetaDataObject);
}
const errorResponse = getErrorResponse(groupedRecordsByAction);

const error = new InstrumentationError('Invalid action type in record event');
const errorObj = generateErrorObject(error);
const errorResponseList = errorMetaData.map((metadata) =>
getErrorRespEvents(metadata, errorObj.status, errorObj.message, errorObj.statTags),
const finalResponse = createFinalResponse(
deleteResponse,
insertResponse,
updateResponse,
errorResponse,
);

if (deleteResponse && deleteResponse.batchedRequest.length > 0) {
finalResponse.push(deleteResponse);
}
if (insertResponse && insertResponse.batchedRequest.length > 0) {
finalResponse.push(insertResponse);
}
if (updateResponse && updateResponse.batchedRequest.length > 0) {
finalResponse.push(updateResponse);
}
if (errorResponseList.length > 0) {
finalResponse.push(...errorResponseList);
}

if (finalResponse.length === 0) {
throw new InstrumentationError(
'Missing valid parameters, unable to generate transformed payload',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { getMappingConfig } = require('../../util');

const BASE_ENDPOINT = 'https://googleads.googleapis.com/v15/customers';
const BASE_ENDPOINT = 'https://googleads.googleapis.com/v16/customers';
const CONFIG_CATEGORIES = {
AUDIENCE_LIST: { type: 'audienceList', name: 'offlineDataJobs' },
ADDRESSINFO: { type: 'addressInfo', name: 'addressInfo' },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const {
getValueFromMessage,
getAccessToken,
constructPayload,
returnArrayOfSubarrays,
getSuccessRespEvents,
} = require('../../util');
const { populateConsentFromConfig } = require('../../util/googleUtils');
const { populateIdentifiers, responseBuilder } = require('./util');
const { getErrorResponse, createFinalResponse } = require('../../util/recordUtils');
const { offlineDataJobsMapping, consentConfigMap } = require('./config');

const processRecordEventArray = (
records,
message,
destination,
accessToken,
developerToken,
operationType,
) => {
let outputPayloads = {};
// ** only send it if identifier > 0

const fieldsArray = [];
const metadata = [];
records.forEach((record) => {
fieldsArray.push(record.message.fields);
metadata.push(record.metadata);
});

const userIdentifiersList = populateIdentifiers(fieldsArray, destination);

const outputPayload = constructPayload(message, offlineDataJobsMapping);
outputPayload.operations = [];
// breaking the userIdentiFier array in chunks of 20
const userIdentifierChunks = returnArrayOfSubarrays(userIdentifiersList, 20);
// putting each chunk in different create/remove operations
switch (operationType) {
case 'add':
// for add operation
userIdentifierChunks.forEach((element) => {
const operations = {
create: {},
};
operations.create.userIdentifiers = element;
outputPayload.operations.push(operations);
});
outputPayloads = { ...outputPayloads, create: outputPayload };
break;
case 'remove':
// for remove operation
userIdentifierChunks.forEach((element) => {
const operations = {
remove: {},
};
operations.remove.userIdentifiers = element;
outputPayload.operations.push(operations);
});
outputPayloads = { ...outputPayloads, remove: outputPayload };
break;
default:
}

const toSendEvents = [];
Object.values(outputPayloads).forEach((data) => {
const consentObj = populateConsentFromConfig(destination.Config, consentConfigMap);
toSendEvents.push(
responseBuilder(accessToken, developerToken, data, destination, message, consentObj),
);
});

const successResponse = getSuccessRespEvents(toSendEvents, metadata, destination, true);

return successResponse;
};

async function processRecordInputs(groupedRecordInputs) {
const { destination, message, metadata } = groupedRecordInputs[0];
const accessToken = getAccessToken(metadata, 'accessToken');
const developerToken = getValueFromMessage(metadata, 'secret.developer_token');

const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) =>
record.message.action?.toLowerCase(),
);

let insertResponse;
let deleteResponse;
let updateResponse;

if (groupedRecordsByAction.delete) {
deleteResponse = processRecordEventArray(
groupedRecordsByAction.delete,
message,
destination,
accessToken,
developerToken,
'remove',
);
}

if (groupedRecordsByAction.insert) {
insertResponse = processRecordEventArray(
groupedRecordsByAction.insert,
message,
destination,
accessToken,
developerToken,
'add',
);
}

if (groupedRecordsByAction.update) {
updateResponse = processRecordEventArray(
groupedRecordsByAction.update,
message,
destination,
accessToken,
developerToken,
'add',
);
}

const errorResponse = getErrorResponse(groupedRecordsByAction);
const finalResponse = createFinalResponse(
deleteResponse,
insertResponse,
updateResponse,
errorResponse,
);
if (finalResponse.length === 0) {
throw new InstrumentationError(
'Missing valid parameters, unable to generate transformed payload',
);
}

return finalResponse;
}

module.exports = {
processRecordInputs,
};
Loading

0 comments on commit 60fee0e

Please sign in to comment.