Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: metadata structure correction #3119

Merged
merged 8 commits into from
Feb 26, 2024
283 changes: 29 additions & 254 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"@koa/router": "^12.0.0",
"@ndhoule/extend": "^2.0.0",
"@pyroscope/nodejs": "^0.2.6",
"@rudderstack/integrations-lib": "^0.2.2",
"@rudderstack/integrations-lib": "^0.2.4",
"@rudderstack/workflow-engine": "^0.7.2",
"ajv": "^8.12.0",
"ajv-draft-04": "^1.0.0",
Expand Down
15 changes: 15 additions & 0 deletions src/controllers/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import logger from '../logger';
import { getIntegrationVersion } from '../util/utils';
import tags from '../v0/util/tags';
import { DynamicConfigParser } from '../util/dynamicConfigParser';
import { checkInvalidRtTfEvents } from '../v0/util';

export class DestinationController {
public static async destinationTransformAtProcessor(ctx: Context) {
Expand Down Expand Up @@ -101,6 +102,20 @@ export class DestinationController {
const routerRequest = ctx.request.body as RouterTransformationRequest;
const destination = routerRequest.destType;
let events = routerRequest.input;
const errorRespEvents = checkInvalidRtTfEvents(events);
if (errorRespEvents.length > 0) {
errorRespEvents[0].metadata = [
{
destType: destination,
},
];
logger.debug(
`[${destination}] Invalid router transform payload structure: ${JSON.stringify(events)}`,
);
ctx.body = { output: errorRespEvents };
ControllerUtility.postProcess(ctx);
return ctx;
}
const metaTags = MiscService.getMetaTags(events[0].metadata);
stats.histogram('dest_transform_input_events', events.length, {
destination,
Expand Down
3 changes: 1 addition & 2 deletions src/legacy/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const Router = require('@koa/router');
const lodash = require('lodash');
const fs = require('fs');
const path = require('path');
const { PlatformError } = require('@rudderstack/integrations-lib');
const { PlatformError, getErrorRespEvents } = require('@rudderstack/integrations-lib');
const logger = require('../logger');
const stats = require('../util/stats');
const { SUPPORTED_VERSIONS, API_VERSION } = require('../routes/utils/constants');
Expand All @@ -18,7 +18,6 @@ const {
isNonFuncObject,
getMetadata,
generateErrorObject,
getErrorRespEvents,
isCdkDestination,
checkAndCorrectUserId,
} = require('../v0/util');
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const { EventType } = require('../../../constants');
const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
groupEventsByType,
} = require('../../util');
Expand Down Expand Up @@ -130,10 +129,6 @@ const processEachTypedEventList = (
};

const processRouterDest = (inputs) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
const finalResp = [];

const batchedEvents = groupEventsByType(inputs);
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const {
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');
Expand Down Expand Up @@ -245,11 +244,6 @@ const batchEvents = (eventChunksArray) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
Expand Down
8 changes: 0 additions & 8 deletions src/v0/destinations/clevertap/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const {
handleRtTfSingleEventError,
batchMultiplexedEvents,
getSuccessRespEvents,
checkInvalidRtTfEvents,
} = require('../../util');
const { generateClevertapBatchedPayload } = require('./utils');

Expand Down Expand Up @@ -389,13 +388,6 @@ const processEvent = (message, destination) => {
const process = (event) => processEvent(event.message, event.destination);

const processRouterDest = (inputs, reqMetadata) => {
// const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
// return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = [];
const errorRespList = [];
// const { destination } = inputs[0];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/customerio/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { EventType, MappedToDestinationKey } = require('../../../constants');

const {
getErrorRespEvents,
getSuccessRespEvents,
defaultRequestConfig,
addExternalIdToTraits,
Expand Down Expand Up @@ -174,10 +173,6 @@ const batchEvents = (successRespList) => {
};

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const {
handleRtTfSingleEventError,
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
Expand Down Expand Up @@ -186,11 +185,6 @@ const batchEvents = (storeSalesEvents) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const storeSalesEvents = []; // list containing store sales events in batched format
const clickCallEvents = []; // list containing click and call events in batched format
const errorRespList = [];
Expand Down
11 changes: 1 addition & 10 deletions src/v0/destinations/google_cloud_function/transform.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
const lodash = require('lodash');
const {
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const { getSuccessRespEvents, handleRtTfSingleEventError } = require('../../util');

const { generateBatchedPayload, validateDestinationConfig } = require('./util');

Expand Down Expand Up @@ -40,11 +36,6 @@ function batchEvents(successRespList, maxBatchSize = 10) {

// Router transform with batching by default
const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const successResponseList = [];
const errorRespList = [];
const { destination } = inputs[0];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/googlesheets/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const {
getValueFromMessage,
getSuccessRespEvents,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
} = require('../../util');

const SOURCE_KEYS = ['properties', 'traits', 'context.traits'];
Expand Down Expand Up @@ -111,10 +110,6 @@ const process = (event) => {
const processRouterDest = async (inputs, reqMetadata) => {
const successRespList = [];
const errorRespList = [];
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
await Promise.all(
inputs.map(async (input) => {
try {
Expand Down
10 changes: 1 addition & 9 deletions src/v0/destinations/hs/transform.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
const get = require('get-value');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { EventType } = require('../../../constants');
const {
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getDestinationExternalIDInfoForRetl,
} = require('../../util');
const { handleRtTfSingleEventError, getDestinationExternalIDInfoForRetl } = require('../../util');
const { API_VERSION } = require('./config');
const {
processLegacyIdentify,
Expand Down Expand Up @@ -71,10 +67,6 @@ const process = async (event) => {
// we are batching by default at routerTransform
const processRouterDest = async (inputs, reqMetadata) => {
let tempInputs = inputs;
const errorRespEvents = checkInvalidRtTfEvents(tempInputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const successRespList = [];
const errorRespList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/iterable/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const {
const {
constructPayload,
defaultRequestConfig,
checkInvalidRtTfEvents,
defaultPostRequestConfig,
handleRtTfSingleEventError,
removeUndefinedAndNullValues,
Expand Down Expand Up @@ -162,11 +161,6 @@ const process = (event) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchedEvents = batchEvents(inputs);
const response = await Promise.all(
batchedEvents.map(async (listOfEvents) => {
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/kafka/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const {
getHashFromArray,
removeUndefinedAndNullValues,
getSuccessRespEvents,
getErrorRespEvents,
} = require('../../util');

const filterConfigTopics = (message, destination) => {
Expand Down Expand Up @@ -38,10 +37,6 @@ const filterConfigTopics = (message, destination) => {

const batch = (destEvents) => {
const respList = [];
if (!Array.isArray(destEvents) || destEvents.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}

// Grouping the events by topic
const groupedEvents = groupBy(destEvents, (event) => event.message.topic);
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const {
addExternalIdToTraits,
adduserIdFromExternalId,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
flattenJson,
isNewStatusCodesAccepted,
Expand Down Expand Up @@ -320,10 +319,6 @@ const getEventChunks = (event, subscribeRespList, nonSubscribeRespList) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const subscribeRespList = [];
Expand Down
3 changes: 2 additions & 1 deletion src/v0/destinations/lambda/transform.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const _ = require('lodash');
const { getErrorRespEvents, getSuccessRespEvents } = require('../../util');
const { getErrorRespEvents } = require('@rudderstack/integrations-lib');
const { getSuccessRespEvents } = require('../../util');
const { ConfigurationError } = require('@rudderstack/integrations-lib');

const DEFAULT_INVOCATION_TYPE = 'Event'; // asynchronous invocation
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/mailchimp/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const { InstrumentationError, ConfigurationError } = require('@rudderstack/integ
const {
defaultPutRequestConfig,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
constructPayload,
defaultPostRequestConfig,
isDefinedAndNotNull,
Expand Down Expand Up @@ -162,10 +161,6 @@ const getEventChunks = (event, identifyRespList, trackRespList) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const identifyRespList = [];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/mailjet/transform.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const lodash = require('lodash');
const { TransformationError, InstrumentationError } = require('@rudderstack/integrations-lib');
const {
getErrorRespEvents,
getSuccessRespEvents,
defaultRequestConfig,
defaultPostRequestConfig,
Expand Down Expand Up @@ -121,10 +120,6 @@ const batchEvents = (successRespList) => {
};

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/mailmodo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const {
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getErrorRespEvents,
getSuccessRespEvents,
handleRtTfSingleEventError,
} = require('../../util');
Expand Down Expand Up @@ -191,11 +190,6 @@ function getEventChunks(event, identifyEventChunks, eventResponseList) {
}

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}

const identifyEventChunks = []; // list containing identify events in batched format
const eventResponseList = []; // list containing other events in batched format
const errorRespList = [];
Expand Down
7 changes: 1 addition & 6 deletions src/v0/destinations/marketo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {
InstrumentationError,
ConfigurationError,
UnauthorizedError,
getErrorRespEvents,
} = require('@rudderstack/integrations-lib');
const stats = require('../../../util/stats');
const { EventType, MappedToDestinationKey } = require('../../../constants');
Expand All @@ -28,10 +29,8 @@ const {
getFieldValueFromMessage,
getDestinationExternalID,
getSuccessRespEvents,
getErrorRespEvents,
isDefinedAndNotNull,
generateErrorObject,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const Cache = require('../../util/cache');
Expand Down Expand Up @@ -456,10 +455,6 @@ const process = async (event) => {
const processRouterDest = async (inputs, reqMetadata) => {
// Token needs to be generated for marketo which will be done on input level.
// If destination information is not present Error should be thrown
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let token;
try {
token = await getAuthToken(formatConfig(inputs[0].destination));
Expand Down
12 changes: 6 additions & 6 deletions src/v0/destinations/marketo_static_list/transform.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
const lodash = require('lodash');
const cloneDeep = require('lodash/cloneDeep');
const { InstrumentationError, UnauthorizedError } = require('@rudderstack/integrations-lib');
const {
InstrumentationError,
UnauthorizedError,
getErrorRespEvents,
} = require('@rudderstack/integrations-lib');
const {
defaultPostRequestConfig,
defaultDeleteRequestConfig,
Expand All @@ -9,11 +13,7 @@ const {
} = require('../../util');
const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant');
const { getIds, validateMessageType } = require('./util');
const {
getDestinationExternalID,
defaultRequestConfig,
getErrorRespEvents,
} = require('../../util');
const { getDestinationExternalID, defaultRequestConfig } = require('../../util');
const { formatConfig, MAX_LEAD_IDS_SIZE } = require('./config');
const Cache = require('../../util/cache');
const { getAuthToken } = require('../marketo/transform');
Expand Down
Loading
Loading