diff --git a/.eslintrc.json b/.eslintrc.json index 7258c5c536..556470697d 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -26,6 +26,7 @@ "off", { "cases": { "camelCase": true, "pascalCase": true, "kebabCase": true } } ], + "import/no-import-module-exports": "off", "unicorn/no-instanceof-array": "error", "unicorn/no-static-only-class": "error", "unicorn/consistent-destructuring": "error", diff --git a/package-lock.json b/package-lock.json index 65659a745c..fd42692109 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,7 @@ "@koa/router": "^12.0.0", "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.9", - "@rudderstack/integrations-lib": "^0.2.7", + "@rudderstack/integrations-lib": "^0.2.8", "@rudderstack/workflow-engine": "^0.7.5", "@shopify/jest-koa-mocks": "^5.1.1", "ajv": "^8.12.0", @@ -118,41 +118,6 @@ "typescript": "^5.0.4" } }, - "../rudder-integrations-lib": { - "name": "@rudderstack/integrations-lib", - "version": "0.1.10", - "extraneous": true, - "license": "MIT", - "dependencies": { - "@rudderstack/workflow-engine": "^0.5.7", - "axios": "^1.4.0", - "axios-mock-adapter": "^1.22.0", - "crypto": "^1.0.1", - "get-value": "^3.0.1", - "handlebars": "^4.7.8", - "lodash": "^4.17.21", - "moment": "^2.29.4", - "moment-timezone": "^0.5.43", - "set-value": "^4.1.0", - "sha256": "^0.2.0", - "tslib": "^2.4.0", - "winston": "^3.11.0" - }, - "devDependencies": { - "@types/get-value": "^3.0.3", - "@types/jest": "^29.5.4", - "@types/lodash": "^4.14.195", - "@types/node": "^20.3.3", - "@types/set-value": "^4.0.1", - "@types/sha256": "^0.2.0", - "jest": "^29.4.3", - "pre-commit": "^1.2.2", - "prettier": "^2.8.4", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.1.6" - } - }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "resolved": "https://registry.npmjs.org/@aashutoshrathi/word-wrap/-/word-wrap-1.2.6.tgz", @@ -4472,9 +4437,9 @@ } }, "node_modules/@rudderstack/integrations-lib": { - "version": "0.2.7", - "resolved": "https://registry.npmjs.org/@rudderstack/integrations-lib/-/integrations-lib-0.2.7.tgz", - "integrity": "sha512-F0QVIT2vpSeI+GcUk7AwxMJrmM5SsRk8AS6oH4nHkjjfDoKjdh9rrDVzhXKUYF//FAi32ecmSsW+/6ioB8louw==", + "version": "0.2.8", + "resolved": "https://registry.npmjs.org/@rudderstack/integrations-lib/-/integrations-lib-0.2.8.tgz", + "integrity": "sha512-5CJoFFCRDhG7busCGVktKqEEXO0DbFqJ56TOT+jyDdoTf8sZ7SsSJ4NCZYmSplZrbQGj2R+aArnQnpxA4hPGmA==", "dependencies": { "axios": "^1.4.0", "axios-mock-adapter": "^1.22.0", diff --git a/package.json b/package.json index c91d836c34..c839bc8acc 100644 --- a/package.json +++ b/package.json @@ -64,7 +64,7 @@ "@koa/router": "^12.0.0", "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.9", - "@rudderstack/integrations-lib": "^0.2.7", + "@rudderstack/integrations-lib": "^0.2.8", "@rudderstack/workflow-engine": "^0.7.5", "@shopify/jest-koa-mocks": "^5.1.1", "ajv": "^8.12.0", diff --git a/src/cdk/v2/destinations/fullstory/procWorkflow.yaml b/src/cdk/v2/destinations/fullstory/procWorkflow.yaml index 1a54e8688c..26da955623 100644 --- a/src/cdk/v2/destinations/fullstory/procWorkflow.yaml +++ b/src/cdk/v2/destinations/fullstory/procWorkflow.yaml @@ -76,7 +76,7 @@ steps: "use_most_recent": .message.properties.useMostRecent, }; $.context.payload.user = { - "id": .message.properties.userId ?? .message.userId, + "uid": .message.properties.userId ?? .message.userId, } - name: cleanPayload diff --git a/src/cdk/v2/handler.ts b/src/cdk/v2/handler.ts index edd14e7298..c437247f74 100644 --- a/src/cdk/v2/handler.ts +++ b/src/cdk/v2/handler.ts @@ -1,9 +1,9 @@ import { - WorkflowEngine, - WorkflowEngineFactory, - TemplateType, ExecutionBindings, StepOutput, + TemplateType, + WorkflowEngine, + WorkflowEngineFactory, } from '@rudderstack/workflow-engine'; import { FixMe } from '../../util/types'; @@ -11,9 +11,9 @@ import tags from '../../v0/util/tags'; import { getErrorInfo, + getPlatformBindingsPaths, getRootPathForDestination, getWorkflowPath, - getPlatformBindingsPaths, isCdkV2Destination, } from './utils'; @@ -82,10 +82,12 @@ export async function processCdkV2Workflow( destType: string, parsedEvent: FixMe, feature: string, + logger: FixMe, requestMetadata: NonNullable = {}, bindings: Record = {}, ) { try { + logger.debug(`Processing cdkV2 workflow`); const workflowEngine = await getCachedWorkflowEngine(destType, feature, bindings); return await executeWorkflow(workflowEngine, parsedEvent, requestMetadata); } catch (error) { diff --git a/src/controllers/bulkUpload.ts b/src/controllers/bulkUpload.ts index dbd77dc07f..28556dd5df 100644 --- a/src/controllers/bulkUpload.ts +++ b/src/controllers/bulkUpload.ts @@ -1,10 +1,10 @@ /* eslint-disable global-require, import/no-dynamic-require, @typescript-eslint/no-unused-vars */ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { client as errNotificationClient } from '../util/errorNotifier'; -import logger from '../logger'; import { + getDestFileUploadHandler, getJobStatusHandler, getPollStatusHandler, - getDestFileUploadHandler, } from '../util/fetchDestinationHandlers'; import { CatchErr, ContextBodySimple } from '../util/types'; // TODO: To be refactored and redisgned @@ -31,10 +31,7 @@ const getReqMetadata = (ctx) => { }; export const fileUpload = async (ctx) => { - logger.debug( - 'Native(Bulk-Upload): Request to transformer:: /fileUpload route', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Bulk-Upload): Request to transformer:: /fileUpload route', ctx.request.body); const getReqMetadataFileUpload = () => { try { const reqBody = ctx.request.body; @@ -69,18 +66,12 @@ export const fileUpload = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Response from transformer:: /fileUpload route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Response from transformer:: /fileUpload route', ctx.body); return ctx.body; }; export const pollStatus = async (ctx) => { - logger.debug( - 'Native(Bulk-Upload): Request to transformer:: /pollStatus route', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Bulk-Upload): Request to transformer:: /pollStatus route', ctx.request.body); const { destType }: ContextBodySimple = ctx.request.body; const destFileUploadHandler = getPollStatusHandler('v0', destType.toLowerCase()); @@ -104,17 +95,14 @@ export const pollStatus = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Request from transformer:: /pollStatus route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Request from transformer:: /pollStatus route', ctx.body); return ctx.body; }; export const getWarnJobStatus = async (ctx) => { logger.debug( 'Native(Bulk-Upload): Request to transformer:: /getWarningJobs route', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const { destType }: ContextBodySimple = ctx.request.body; @@ -140,17 +128,14 @@ export const getWarnJobStatus = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Request from transformer:: /getWarningJobs route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Request from transformer:: /getWarningJobs route', ctx.body); return ctx.body; }; export const getFailedJobStatus = async (ctx) => { logger.debug( 'Native(Bulk-Upload): Request to transformer:: /getFailedJobs route', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const { destType }: ContextBodySimple = ctx.request.body; @@ -176,9 +161,6 @@ export const getFailedJobStatus = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Request from transformer:: /getFailedJobs route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Request from transformer:: /getFailedJobs route', ctx.body); return ctx.body; }; diff --git a/src/controllers/delivery.ts b/src/controllers/delivery.ts index 4334dc33b2..8ac7c9902a 100644 --- a/src/controllers/delivery.ts +++ b/src/controllers/delivery.ts @@ -1,28 +1,30 @@ /* eslint-disable prefer-destructuring */ /* eslint-disable sonarjs/no-duplicate-string */ +import { + isDefinedAndNotNullAndNotEmpty, + structuredLogger as logger, +} from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { isDefinedAndNotNullAndNotEmpty } from '@rudderstack/integrations-lib'; +import { ServiceSelector } from '../helpers/serviceSelector'; +import { DeliveryTestService } from '../services/delivertTest/deliveryTest'; +import { DestinationPostTransformationService } from '../services/destination/postTransformation'; import { MiscService } from '../services/misc'; import { - DeliveryV1Response, DeliveryV0Response, + DeliveryV1Response, ProcessorTransformationOutput, ProxyV0Request, ProxyV1Request, } from '../types/index'; -import { ServiceSelector } from '../helpers/serviceSelector'; -import { DeliveryTestService } from '../services/delivertTest/deliveryTest'; -import { ControllerUtility } from './util'; -import logger from '../logger'; -import { DestinationPostTransformationService } from '../services/destination/postTransformation'; -import tags from '../v0/util/tags'; import { FixMe } from '../util/types'; +import tags from '../v0/util/tags'; +import { ControllerUtility } from './util'; const NON_DETERMINABLE = 'Non-determinable'; export class DeliveryController { public static async deliverToDestination(ctx: Context) { - logger.debug('Native(Delivery):: Request to transformer::', JSON.stringify(ctx.request.body)); + logger.info('Native(Delivery):: Request to transformer::', ctx.request.body); let deliveryResponse: DeliveryV0Response; const requestMetadata = MiscService.getRequestMetadata(ctx); const deliveryRequest = ctx.request.body as ProxyV0Request; @@ -52,12 +54,12 @@ export class DeliveryController { ctx.body = { output: deliveryResponse }; ControllerUtility.deliveryPostProcess(ctx, deliveryResponse.status); - logger.debug('Native(Delivery):: Response from transformer::', JSON.stringify(ctx.body)); + logger.debug('Native(Delivery):: Response from transformer::', ctx.body); return ctx; } public static async deliverToDestinationV1(ctx: Context) { - logger.debug('Native(Delivery):: Request to transformer::', JSON.stringify(ctx.request.body)); + logger.debug('Native(Delivery):: Request to transformer::', ctx.request.body); let deliveryResponse: DeliveryV1Response; const requestMetadata = MiscService.getRequestMetadata(ctx); const deliveryRequest = ctx.request.body as ProxyV1Request; @@ -91,15 +93,12 @@ export class DeliveryController { ControllerUtility.deliveryPostProcess(ctx); } - logger.debug('Native(Delivery):: Response from transformer::', JSON.stringify(ctx.body)); + logger.debug('Native(Delivery):: Response from transformer::', ctx.body); return ctx; } public static async testDestinationDelivery(ctx: Context) { - logger.debug( - 'Native(Delivery-Test):: Request to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Delivery-Test):: Request to transformer::', ctx.request.body); const { destination }: { destination: string } = ctx.params; const { version }: { version: string } = ctx.params; const { @@ -117,7 +116,7 @@ export class DeliveryController { ); ctx.body = { output: response }; ControllerUtility.postProcess(ctx); - logger.debug('Native(Delivery-Test):: Response from transformer::', JSON.stringify(ctx.body)); + logger.debug('Native(Delivery-Test):: Response from transformer::', ctx.body); return ctx; } } diff --git a/src/controllers/destination.ts b/src/controllers/destination.ts index d8b3c94524..35606ea62e 100644 --- a/src/controllers/destination.ts +++ b/src/controllers/destination.ts @@ -1,29 +1,27 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { MiscService } from '../services/misc'; -import { DestinationPreTransformationService } from '../services/destination/preTransformation'; +import { ServiceSelector } from '../helpers/serviceSelector'; import { DestinationPostTransformationService } from '../services/destination/postTransformation'; +import { DestinationPreTransformationService } from '../services/destination/preTransformation'; +import { MiscService } from '../services/misc'; import { + ErrorDetailer, ProcessorTransformationRequest, - RouterTransformationRequest, ProcessorTransformationResponse, + RouterTransformationRequest, RouterTransformationResponse, } from '../types/index'; -import { ServiceSelector } from '../helpers/serviceSelector'; -import { ControllerUtility } from './util'; +import { DynamicConfigParser } from '../util/dynamicConfigParser'; import stats from '../util/stats'; -import logger from '../logger'; import { getIntegrationVersion } from '../util/utils'; -import tags from '../v0/util/tags'; -import { DynamicConfigParser } from '../util/dynamicConfigParser'; import { checkInvalidRtTfEvents } from '../v0/util'; +import tags from '../v0/util/tags'; +import { ControllerUtility } from './util'; export class DestinationController { public static async destinationTransformAtProcessor(ctx: Context) { const startTime = new Date(); - logger.debug( - 'Native(Process-Transform):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Process-Transform):: Requst to transformer::', ctx.request.body); let resplist: ProcessorTransformationResponse[]; const requestMetadata = MiscService.getRequestMetadata(ctx); let events = ctx.request.body as ProcessorTransformationRequest[]; @@ -35,6 +33,9 @@ export class DestinationController { ...metaTags, }); const integrationService = ServiceSelector.getDestinationService(events); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(events[0]?.metadata as unknown as ErrorDetailer), + }); try { integrationService.init(); events = DestinationPreTransformationService.preProcess( @@ -50,6 +51,7 @@ export class DestinationController { destination, version, requestMetadata, + loggerWithCtx, ); } catch (error: any) { resplist = events.map((ev) => { @@ -69,10 +71,7 @@ export class DestinationController { } ctx.body = resplist; ControllerUtility.postProcess(ctx); - logger.debug( - 'Native(Process-Transform):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Process-Transform):: Response from transformer::', ctx.body); stats.histogram('dest_transform_output_events', resplist.length, { destination, version, @@ -94,10 +93,7 @@ export class DestinationController { public static async destinationTransformAtRouter(ctx: Context) { const startTime = new Date(); - logger.debug( - 'Native(Router-Transform):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Router-Transform):: Requst to transformer::', ctx.request.body); const requestMetadata = MiscService.getRequestMetadata(ctx); const routerRequest = ctx.request.body as RouterTransformationRequest; const destination = routerRequest.destType; @@ -117,6 +113,9 @@ export class DestinationController { return ctx; } const metaTags = MiscService.getMetaTags(events[0].metadata); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(events[0]?.metadata as unknown as ErrorDetailer), + }); stats.histogram('dest_transform_input_events', events.length, { destination, version: 'v0', @@ -133,6 +132,7 @@ export class DestinationController { destination, getIntegrationVersion(), requestMetadata, + loggerWithCtx, ); } catch (error: any) { const metaTO = integrationService.getTags( @@ -155,10 +155,7 @@ export class DestinationController { version: 'v0', ...metaTags, }); - logger.debug( - 'Native(Router-Transform):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Router-Transform):: Response from transformer::', ctx.body); stats.timing('dest_transform_request_latency', startTime, { destination, version: 'v0', @@ -169,15 +166,15 @@ export class DestinationController { } public static batchProcess(ctx: Context) { - logger.debug( - 'Native(Process-Transform-Batch):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.info('Native(Process-Transform-Batch):: Requst to transformer::', ctx.request.body); const startTime = new Date(); const requestMetadata = MiscService.getRequestMetadata(ctx); const routerRequest = ctx.request.body as RouterTransformationRequest; const destination = routerRequest.destType; let events = routerRequest.input; + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(events[0]?.metadata as unknown as ErrorDetailer), + }); const integrationService = ServiceSelector.getDestinationService(events); try { events = DestinationPreTransformationService.preProcess(events, ctx); @@ -187,6 +184,7 @@ export class DestinationController { destination, getIntegrationVersion(), requestMetadata, + loggerWithCtx, ); ctx.body = resplist; } catch (error: any) { @@ -204,10 +202,7 @@ export class DestinationController { ctx.body = [errResp]; } ControllerUtility.postProcess(ctx); - logger.debug( - 'Native(Process-Transform-Batch):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Process-Transform-Batch):: Response from transformer::', ctx.body); stats.timing('dest_transform_request_latency', startTime, { destination, feature: tags.FEATURES.BATCH, diff --git a/src/controllers/regulation.ts b/src/controllers/regulation.ts index 318b5ed4e7..4b8f87e3fa 100644 --- a/src/controllers/regulation.ts +++ b/src/controllers/regulation.ts @@ -1,19 +1,16 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import logger from '../logger'; -import { UserDeletionRequest, UserDeletionResponse } from '../types'; import { ServiceSelector } from '../helpers/serviceSelector'; -import tags from '../v0/util/tags'; -import stats from '../util/stats'; import { DestinationPostTransformationService } from '../services/destination/postTransformation'; +import { UserDeletionRequest, UserDeletionResponse } from '../types'; +import stats from '../util/stats'; +import tags from '../v0/util/tags'; // eslint-disable-next-line @typescript-eslint/no-unused-vars import { CatchErr } from '../util/types'; export class RegulationController { public static async deleteUsers(ctx: Context) { - logger.debug( - 'Native(Process-Transform):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Process-Transform):: Requst to transformer::', ctx.request.body); const startTime = new Date(); let rudderDestInfo: any; try { diff --git a/src/controllers/source.ts b/src/controllers/source.ts index ef5483a756..e1a4931371 100644 --- a/src/controllers/source.ts +++ b/src/controllers/source.ts @@ -1,20 +1,18 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { MiscService } from '../services/misc'; import { ServiceSelector } from '../helpers/serviceSelector'; -import { ControllerUtility } from './util'; -import logger from '../logger'; +import { MiscService } from '../services/misc'; import { SourcePostTransformationService } from '../services/source/postTransformation'; +import { ControllerUtility } from './util'; export class SourceController { public static async sourceTransform(ctx: Context) { - logger.debug( - 'Native(Source-Transform):: Request to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Source-Transform):: Request to transformer::', ctx.request.body); const requestMetadata = MiscService.getRequestMetadata(ctx); const events = ctx.request.body as object[]; const { version, source }: { version: string; source: string } = ctx.params; const integrationService = ServiceSelector.getNativeSourceService(); + const loggerWithCtx = logger.child({ version, source }); try { const { implementationVersion, input } = ControllerUtility.adaptInputToVersion( source, @@ -26,6 +24,7 @@ export class SourceController { source, implementationVersion, requestMetadata, + loggerWithCtx, ); ctx.body = resplist; } catch (err: any) { @@ -34,10 +33,7 @@ export class SourceController { ctx.body = [resp]; } ControllerUtility.postProcess(ctx); - logger.debug( - 'Native(Source-Transform):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Source-Transform):: Response from transformer::', ctx.body); return ctx; } } diff --git a/src/controllers/userTransform.ts b/src/controllers/userTransform.ts index 3e01686a52..0e288c6f04 100644 --- a/src/controllers/userTransform.ts +++ b/src/controllers/userTransform.ts @@ -1,10 +1,10 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { ProcessorTransformationRequest, UserTransformationServiceResponse } from '../types/index'; import { UserTransformService } from '../services/userTransform'; -import logger from '../logger'; +import { ProcessorTransformationRequest, UserTransformationServiceResponse } from '../types/index'; import { - setupUserTransformHandler, extractLibraries, + setupUserTransformHandler, validateCode, } from '../util/customTransformer'; import { ControllerUtility } from './util'; @@ -13,7 +13,7 @@ export class UserTransformController { public static async transform(ctx: Context) { logger.debug( '(User transform - router:/customTransform ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const requestSize = Number(ctx.request.get('content-length')); const events = ctx.request.body as ProcessorTransformationRequest[]; @@ -23,7 +23,7 @@ export class UserTransformController { ControllerUtility.postProcess(ctx, processedRespone.retryStatus); logger.debug( '(User transform - router:/customTransform ):: Response from transformer', - JSON.stringify(ctx.response.body), + ctx.response.body, ); return ctx; } @@ -31,7 +31,7 @@ export class UserTransformController { public static async testTransform(ctx: Context) { logger.debug( '(User transform - router:/transformation/test ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const { events, trRevCode, libraryVersionIDs = [] } = ctx.request.body as any; const response = await UserTransformService.testTransformRoutine( @@ -43,7 +43,7 @@ export class UserTransformController { ControllerUtility.postProcess(ctx, response.status); logger.debug( '(User transform - router:/transformation/test ):: Response from transformer', - JSON.stringify(ctx.response.body), + ctx.response.body, ); return ctx; } @@ -51,7 +51,7 @@ export class UserTransformController { public static async testTransformLibrary(ctx: Context) { logger.debug( '(User transform - router:/transformationLibrary/test ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); try { const { code, language = 'javascript' } = ctx.request.body as any; @@ -66,7 +66,7 @@ export class UserTransformController { } logger.debug( '(User transform - router:/transformationLibrary/test ):: Response from transformer', - JSON.stringify(ctx.response.body), + ctx.response.body, ); return ctx; } @@ -74,7 +74,7 @@ export class UserTransformController { public static async testTransformSethandle(ctx: Context) { logger.debug( '(User transform - router:/transformation/sethandle ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); try { const { trRevCode, libraryVersionIDs = [] } = ctx.request.body as any; @@ -96,7 +96,7 @@ export class UserTransformController { } logger.debug( '(User transform - router:/transformation/sethandle ):: Response from transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); return ctx; } @@ -104,7 +104,7 @@ export class UserTransformController { public static async extractLibhandle(ctx: Context) { logger.debug( '(User transform - router:/extractLibs ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); try { const { @@ -134,7 +134,7 @@ export class UserTransformController { } logger.debug( '(User transform - router:/extractLibs ):: Response from transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); return ctx; } diff --git a/src/index.ts b/src/index.ts index 36f32f1aed..5557994b2e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,14 +1,14 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; +import dotenv from 'dotenv'; +import gracefulShutdown from 'http-graceful-shutdown'; import Koa from 'koa'; import bodyParser from 'koa-bodyparser'; -import gracefulShutdown from 'http-graceful-shutdown'; -import dotenv from 'dotenv'; -import logger from './logger'; -import cluster from './util/cluster'; +import { addRequestSizeMiddleware, addStatMiddleware, initPyroscope } from './middleware'; +import { addSwaggerRoutes, applicationRoutes } from './routes'; import { metricsRouter } from './routes/metricsRouter'; -import { addStatMiddleware, addRequestSizeMiddleware, initPyroscope } from './middleware'; -import { logProcessInfo } from './util/utils'; -import { applicationRoutes, addSwaggerRoutes } from './routes'; +import cluster from './util/cluster'; import { RedisDB } from './util/redis/redisConnector'; +import { logProcessInfo } from './util/utils'; dotenv.config(); const clusterEnabled = process.env.CLUSTER_ENABLED !== 'false'; diff --git a/src/interfaces/DestinationService.ts b/src/interfaces/DestinationService.ts index 4947089b5d..5d7596dac5 100644 --- a/src/interfaces/DestinationService.ts +++ b/src/interfaces/DestinationService.ts @@ -1,14 +1,14 @@ import { DeliveryV0Response, + DeliveryV1Response, MetaTransferObject, ProcessorTransformationRequest, ProcessorTransformationResponse, + ProxyRequest, RouterTransformationRequestData, RouterTransformationResponse, UserDeletionRequest, UserDeletionResponse, - ProxyRequest, - DeliveryV1Response, } from '../types/index'; export interface DestinationService { @@ -28,6 +28,7 @@ export interface DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: NonNullable, ): Promise; doRouterTransformation( @@ -35,6 +36,7 @@ export interface DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: NonNullable, ): Promise; doBatchTransformation( @@ -42,6 +44,7 @@ export interface DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: NonNullable, ): RouterTransformationResponse[]; deliver( diff --git a/src/interfaces/SourceService.ts b/src/interfaces/SourceService.ts index c7de8cfe8b..fab6490264 100644 --- a/src/interfaces/SourceService.ts +++ b/src/interfaces/SourceService.ts @@ -1,4 +1,5 @@ import { MetaTransferObject, SourceTransformationResponse } from '../types/index'; +import { FixMe } from '../util/types'; export interface SourceService { getTags(): MetaTransferObject; @@ -8,5 +9,6 @@ export interface SourceService { sourceType: string, version: string, requestMetadata: NonNullable, + logger: FixMe, ): Promise; } diff --git a/src/services/comparator.ts b/src/services/comparator.ts index 36cb0ebd5a..511436dfd1 100644 --- a/src/services/comparator.ts +++ b/src/services/comparator.ts @@ -1,4 +1,5 @@ /* eslint-disable class-methods-use-this */ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { DestinationService } from '../interfaces/DestinationService'; import { DeliveryV0Response, @@ -14,10 +15,9 @@ import { UserDeletionRequest, UserDeletionResponse, } from '../types'; -import tags from '../v0/util/tags'; -import stats from '../util/stats'; -import logger from '../logger'; import { CommonUtils } from '../util/common'; +import stats from '../util/stats'; +import tags from '../v0/util/tags'; const NS_PER_SEC = 1e9; @@ -204,6 +204,7 @@ export class ComparatorService implements DestinationService { destinationType, version, requestMetadata, + logger, ); const primaryTimeDiff = process.hrtime(primaryStartTime); const primaryTime = primaryTimeDiff[0] * NS_PER_SEC + primaryTimeDiff[1]; @@ -262,6 +263,7 @@ export class ComparatorService implements DestinationService { destinationType, version, requestMetadata, + logger, ); const primaryTimeDiff = process.hrtime(primaryStartTime); const primaryTime = primaryTimeDiff[0] * NS_PER_SEC + primaryTimeDiff[1]; @@ -320,6 +322,7 @@ export class ComparatorService implements DestinationService { destinationType, version, requestMetadata, + {}, ); const primaryTimeDiff = process.hrtime(primaryStartTime); const primaryTime = primaryTimeDiff[0] * NS_PER_SEC + primaryTimeDiff[1]; diff --git a/src/services/destination/__tests__/nativeIntegration.test.ts b/src/services/destination/__tests__/nativeIntegration.test.ts index 59c8b41881..85d099d292 100644 --- a/src/services/destination/__tests__/nativeIntegration.test.ts +++ b/src/services/destination/__tests__/nativeIntegration.test.ts @@ -1,11 +1,12 @@ -import { NativeIntegrationDestinationService } from '../nativeIntegration'; -import { DestinationPostTransformationService } from '../postTransformation'; +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; +import { FetchHandler } from '../../../helpers/fetchHandlers'; import { - ProcessorTransformationRequest, ProcessorTransformationOutput, + ProcessorTransformationRequest, ProcessorTransformationResponse, } from '../../../types/index'; -import { FetchHandler } from '../../../helpers/fetchHandlers'; +import { NativeIntegrationDestinationService } from '../nativeIntegration'; +import { DestinationPostTransformationService } from '../postTransformation'; afterEach(() => { jest.clearAllMocks(); @@ -47,6 +48,7 @@ describe('NativeIntegration Service', () => { destType, version, requestMetadata, + logger, ); expect(resp).toEqual(tresponse); @@ -77,6 +79,7 @@ describe('NativeIntegration Service', () => { destType, version, requestMetadata, + logger, ); const expected = [ diff --git a/src/services/destination/cdkV2Integration.ts b/src/services/destination/cdkV2Integration.ts index c18a5cd936..a649da9154 100644 --- a/src/services/destination/cdkV2Integration.ts +++ b/src/services/destination/cdkV2Integration.ts @@ -1,27 +1,28 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable class-methods-use-this */ -import groupBy from 'lodash/groupBy'; import { TransformationError } from '@rudderstack/integrations-lib'; +import groupBy from 'lodash/groupBy'; import { processCdkV2Workflow } from '../../cdk/v2/handler'; import { DestinationService } from '../../interfaces/DestinationService'; import { DeliveryV0Response, + DeliveryV1Response, ErrorDetailer, MetaTransferObject, + ProcessorTransformationOutput, ProcessorTransformationRequest, ProcessorTransformationResponse, + ProxyRequest, RouterTransformationRequestData, RouterTransformationResponse, - ProcessorTransformationOutput, UserDeletionRequest, UserDeletionResponse, - ProxyRequest, - DeliveryV1Response, } from '../../types/index'; +import stats from '../../util/stats'; +import { CatchErr, FixMe } from '../../util/types'; import tags from '../../v0/util/tags'; +import { MiscService } from '../misc'; import { DestinationPostTransformationService } from './postTransformation'; -import stats from '../../util/stats'; -import { CatchErr } from '../../util/types'; export class CDKV2DestinationService implements DestinationService { public init() {} @@ -55,10 +56,19 @@ export class CDKV2DestinationService implements DestinationService { destinationType: string, _version: string, requestMetadata: NonNullable, + logger: any, ): Promise { // TODO: Change the promise type const respList: ProcessorTransformationResponse[][] = await Promise.all( events.map(async (event) => { + const metaTo = this.getTags( + destinationType, + event.metadata.destinationId, + event.metadata.workspaceId, + tags.FEATURES.PROCESSOR, + ); + metaTo.metadata = event.metadata; + const loggerWithCtx = logger.child({ ...MiscService.getLoggableData(metaTo.errorDetails) }); try { const transformedPayloads: | ProcessorTransformationOutput @@ -66,9 +76,9 @@ export class CDKV2DestinationService implements DestinationService { destinationType, event, tags.FEATURES.PROCESSOR, + loggerWithCtx, requestMetadata, ); - stats.increment('event_transform_success', { destType: destinationType, module: tags.MODULES.DESTINATION, @@ -85,13 +95,6 @@ export class CDKV2DestinationService implements DestinationService { undefined, ); } catch (error: CatchErr) { - const metaTo = this.getTags( - destinationType, - event.metadata.destinationId, - event.metadata.workspaceId, - tags.FEATURES.PROCESSOR, - ); - metaTo.metadata = event.metadata; const erroredResp = DestinationPostTransformationService.handleProcessorTransformFailureEvents( error, @@ -112,6 +115,7 @@ export class CDKV2DestinationService implements DestinationService { destinationType: string, _version: string, requestMetadata: NonNullable, + logger: FixMe, ): Promise { const allDestEvents: object = groupBy( events, @@ -127,12 +131,16 @@ export class CDKV2DestinationService implements DestinationService { tags.FEATURES.ROUTER, ); metaTo.metadata = destInputArray[0].metadata; + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(metaTo.errorDetails), + }); try { const doRouterTransformationResponse: RouterTransformationResponse[] = await processCdkV2Workflow( destinationType, destInputArray, tags.FEATURES.ROUTER, + loggerWithCtx, requestMetadata, ); return DestinationPostTransformationService.handleRouterTransformSuccessEvents( diff --git a/src/services/destination/nativeIntegration.ts b/src/services/destination/nativeIntegration.ts index 2bb82fc602..0bc9308fcd 100644 --- a/src/services/destination/nativeIntegration.ts +++ b/src/services/destination/nativeIntegration.ts @@ -1,31 +1,32 @@ /* eslint-disable prefer-destructuring */ /* eslint-disable sonarjs/no-duplicate-string */ /* eslint-disable @typescript-eslint/no-unused-vars */ -import groupBy from 'lodash/groupBy'; import cloneDeep from 'lodash/cloneDeep'; +import groupBy from 'lodash/groupBy'; +import networkHandlerFactory from '../../adapters/networkHandlerFactory'; +import { FetchHandler } from '../../helpers/fetchHandlers'; import { DestinationService } from '../../interfaces/DestinationService'; import { + DeliveryJobState, DeliveryV0Response, + DeliveryV1Response, ErrorDetailer, MetaTransferObject, + ProcessorTransformationOutput, ProcessorTransformationRequest, ProcessorTransformationResponse, + ProxyRequest, + ProxyV0Request, + ProxyV1Request, RouterTransformationRequestData, RouterTransformationResponse, - ProcessorTransformationOutput, UserDeletionRequest, UserDeletionResponse, - ProxyRequest, - ProxyV0Request, - ProxyV1Request, - DeliveryV1Response, - DeliveryJobState, } from '../../types/index'; -import { DestinationPostTransformationService } from './postTransformation'; -import networkHandlerFactory from '../../adapters/networkHandlerFactory'; -import { FetchHandler } from '../../helpers/fetchHandlers'; -import tags from '../../v0/util/tags'; import stats from '../../util/stats'; +import tags from '../../v0/util/tags'; +import { MiscService } from '../misc'; +import { DestinationPostTransformationService } from './postTransformation'; export class NativeIntegrationDestinationService implements DestinationService { public init() {} @@ -59,27 +60,33 @@ export class NativeIntegrationDestinationService implements DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: any, ): Promise { const destHandler = FetchHandler.getDestHandler(destinationType, version); const respList: ProcessorTransformationResponse[][] = await Promise.all( events.map(async (event) => { + const metaTO = this.getTags( + destinationType, + event.metadata?.destinationId, + event.metadata?.workspaceId, + tags.FEATURES.PROCESSOR, + ); + metaTO.metadata = event.metadata; + const loggerWithCtx = logger.child({ ...MiscService.getLoggableData(metaTO.errorDetails) }); try { const transformedPayloads: | ProcessorTransformationOutput - | ProcessorTransformationOutput[] = await destHandler.process(event, requestMetadata); + | ProcessorTransformationOutput[] = await destHandler.process( + event, + requestMetadata, + loggerWithCtx, + ); return DestinationPostTransformationService.handleProcessorTransformSucessEvents( event, transformedPayloads, destHandler, ); } catch (error: any) { - const metaTO = this.getTags( - destinationType, - event.metadata?.destinationId, - event.metadata?.workspaceId, - tags.FEATURES.PROCESSOR, - ); - metaTO.metadata = event.metadata; const erroredResp = DestinationPostTransformationService.handleProcessorTransformFailureEvents( error, @@ -97,6 +104,7 @@ export class NativeIntegrationDestinationService implements DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: any, ): Promise { const destHandler = FetchHandler.getDestHandler(destinationType, version); const allDestEvents: NonNullable = groupBy( @@ -112,9 +120,16 @@ export class NativeIntegrationDestinationService implements DestinationService { destInputArray[0].metadata?.workspaceId, tags.FEATURES.ROUTER, ); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(metaTO.errorDetails), + }); try { const doRouterTransformationResponse: RouterTransformationResponse[] = - await destHandler.processRouterDest(cloneDeep(destInputArray), requestMetadata); + await destHandler.processRouterDest( + cloneDeep(destInputArray), + requestMetadata, + loggerWithCtx, + ); metaTO.metadata = destInputArray[0].metadata; return DestinationPostTransformationService.handleRouterTransformSuccessEvents( doRouterTransformationResponse, @@ -141,6 +156,7 @@ export class NativeIntegrationDestinationService implements DestinationService { destinationType: string, version: any, requestMetadata: NonNullable, + logger: any, ): RouterTransformationResponse[] { const destHandler = FetchHandler.getDestHandler(destinationType, version); if (!destHandler.batch) { @@ -152,20 +168,24 @@ export class NativeIntegrationDestinationService implements DestinationService { ); const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents); const response = groupedEvents.map((destEvents) => { + const metaTO = this.getTags( + destinationType, + destEvents[0].metadata.destinationId, + destEvents[0].metadata.workspaceId, + tags.FEATURES.BATCH, + ); + metaTO.metadatas = events.map((event) => event.metadata); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(metaTO.errorDetails), + }); try { const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch( destEvents, requestMetadata, + loggerWithCtx, ); return destBatchedRequests; } catch (error: any) { - const metaTO = this.getTags( - destinationType, - destEvents[0].metadata.destinationId, - destEvents[0].metadata.workspaceId, - tags.FEATURES.BATCH, - ); - metaTO.metadatas = events.map((event) => event.metadata); const errResp = DestinationPostTransformationService.handleBatchTransformFailureEvents( error, metaTO, @@ -264,6 +284,7 @@ export class NativeIntegrationDestinationService implements DestinationService { error: `${destType}: Doesn't support deletion of users`, } as UserDeletionResponse; } + const metaTO = this.getTags(destType, 'unknown', 'unknown', tags.FEATURES.USER_DELETION); try { const result: UserDeletionResponse = await destUserDeletionHandler.processDeleteUsers({ ...request, @@ -276,7 +297,6 @@ export class NativeIntegrationDestinationService implements DestinationService { }); return result; } catch (error: any) { - const metaTO = this.getTags(destType, 'unknown', 'unknown', tags.FEATURES.USER_DELETION); return DestinationPostTransformationService.handleUserDeletionFailureEvents( error, metaTO, diff --git a/src/services/destination/postTransformation.ts b/src/services/destination/postTransformation.ts index 161547683b..40cee61e66 100644 --- a/src/services/destination/postTransformation.ts +++ b/src/services/destination/postTransformation.ts @@ -1,24 +1,30 @@ /* eslint-disable no-param-reassign */ +import { PlatformError } from '@rudderstack/integrations-lib'; import cloneDeep from 'lodash/cloneDeep'; -import isObject from 'lodash/isObject'; import isEmpty from 'lodash/isEmpty'; -import { PlatformError } from '@rudderstack/integrations-lib'; +import isObject from 'lodash/isObject'; import { + DeliveryJobState, + DeliveryV0Response, + DeliveryV1Response, + MetaTransferObject, + ProcessorTransformationOutput, ProcessorTransformationRequest, ProcessorTransformationResponse, RouterTransformationResponse, - ProcessorTransformationOutput, - DeliveryV0Response, - MetaTransferObject, UserDeletionResponse, - DeliveryV1Response, - DeliveryJobState, } from '../../types/index'; -import { generateErrorObject } from '../../v0/util'; -import { ErrorReportingService } from '../errorReporting'; -import tags from '../../v0/util/tags'; import stats from '../../util/stats'; import { FixMe } from '../../util/types'; +import { generateErrorObject } from '../../v0/util'; +import tags from '../../v0/util/tags'; +import { ErrorReportingService } from '../errorReporting'; +import { MiscService } from '../misc'; + +const defaultErrorMessages = { + router: '[Router Transform] Error occurred while processing the payload.', + delivery: '[Delivery] Error occured while processing payload', +} as const; export class DestinationPostTransformationService { public static handleProcessorTransformSucessEvents( @@ -62,6 +68,10 @@ export class DestinationPostTransformationService { error: errObj.message || '[Processor Transform] Error occurred while processing the payload.', statTags: errObj.statTags, } as ProcessorTransformationResponse; + MiscService.logError( + errObj.message || '[Processor Transform] Error occurred while processing the payload.', + metaTo.errorDetails, + ); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } @@ -99,6 +109,7 @@ export class DestinationPostTransformationService { ...resp.statTags, ...metaTo.errorDetails, }; + MiscService.logError(resp.error || defaultErrorMessages.router, metaTo.errorDetails); stats.increment('event_transform_failure', metaTo.errorDetails); } else { stats.increment('event_transform_success', { @@ -124,9 +135,10 @@ export class DestinationPostTransformationService { metadata: metaTo.metadatas, batched: false, statusCode: errObj.status, - error: errObj.message || '[Router Transform] Error occurred while processing the payload.', + error: errObj.message || defaultErrorMessages.router, statTags: errObj.statTags, } as RouterTransformationResponse; + MiscService.logError(errObj.message || defaultErrorMessages.router, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); stats.increment('event_transform_failure', metaTo.errorDetails); return resp; @@ -141,9 +153,10 @@ export class DestinationPostTransformationService { metadata: metaTo.metadatas, batched: false, statusCode: 500, // for batch we should consider code error hence keeping retryable - error: errObj.message || '[Batch Transform] Error occurred while processing payload.', + error: errObj.message || defaultErrorMessages.delivery, statTags: errObj.statTags, } as RouterTransformationResponse; + MiscService.logError(error as string, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } @@ -174,6 +187,10 @@ export class DestinationPostTransformationService { const errObj = generateErrorObject(error, metaTo.errorDetails, false); const metadataArray = metaTo.metadatas; if (!Array.isArray(metadataArray)) { + MiscService.logError( + 'Proxy v1 endpoint error : metadataArray is not an array', + metaTo.errorDetails, + ); // Panic throw new PlatformError('Proxy v1 endpoint error : metadataArray is not an array'); } @@ -182,7 +199,7 @@ export class DestinationPostTransformationService { error: JSON.stringify(error.destinationResponse?.response) || errObj.message || - '[Delivery] Error occured while processing payload', + defaultErrorMessages.delivery, statusCode: errObj.status, metadata, } as DeliveryJobState; @@ -198,7 +215,7 @@ export class DestinationPostTransformationService { authErrorCategory: errObj.authErrorCategory, }), } as DeliveryV1Response; - + MiscService.logError(errObj.message, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } @@ -216,6 +233,7 @@ export class DestinationPostTransformationService { authErrorCategory: errObj.authErrorCategory, }), } as UserDeletionResponse; + MiscService.logError(errObj.message, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } diff --git a/src/services/misc.ts b/src/services/misc.ts index e0953d08bf..3df1196c1d 100644 --- a/src/services/misc.ts +++ b/src/services/misc.ts @@ -1,10 +1,11 @@ /* eslint-disable global-require, import/no-dynamic-require */ +import { LoggableExtraData, structuredLogger as logger } from '@rudderstack/integrations-lib'; import fs from 'fs'; -import path from 'path'; import { Context } from 'koa'; +import path from 'path'; import { DestHandlerMap } from '../constants/destinationCanonicalNames'; -import { Metadata } from '../types'; import { getCPUProfile, getHeapProfile } from '../middleware'; +import { ErrorDetailer, Metadata } from '../types'; export class MiscService { public static getDestHandler(dest: string, version: string) { @@ -74,4 +75,21 @@ export class MiscService { public static async getHeapProfile() { return getHeapProfile(); } + + public static getLoggableData(errorDetailer: ErrorDetailer): Partial { + return { + ...(errorDetailer?.destinationId && { destinationId: errorDetailer.destinationId }), + ...(errorDetailer?.sourceId && { sourceId: errorDetailer.sourceId }), + ...(errorDetailer?.workspaceId && { workspaceId: errorDetailer.workspaceId }), + ...(errorDetailer?.destType && { destType: errorDetailer.destType }), + ...(errorDetailer?.module && { module: errorDetailer.module }), + ...(errorDetailer?.implementation && { implementation: errorDetailer.implementation }), + ...(errorDetailer?.feature && { feature: errorDetailer.feature }), + }; + } + + public static logError(message: string, errorDetailer: ErrorDetailer) { + const loggableExtraData: Partial = this.getLoggableData(errorDetailer); + logger.errorw(message || '', loggableExtraData); + } } diff --git a/src/services/source/__tests__/nativeIntegration.test.ts b/src/services/source/__tests__/nativeIntegration.test.ts index bb40438811..77e355fd1a 100644 --- a/src/services/source/__tests__/nativeIntegration.test.ts +++ b/src/services/source/__tests__/nativeIntegration.test.ts @@ -1,8 +1,9 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; +import { FetchHandler } from '../../../helpers/fetchHandlers'; +import { RudderMessage, SourceTransformationResponse } from '../../../types/index'; +import stats from '../../../util/stats'; import { NativeIntegrationSourceService } from '../nativeIntegration'; import { SourcePostTransformationService } from '../postTransformation'; -import { SourceTransformationResponse, RudderMessage } from '../../../types/index'; -import stats from '../../../util/stats'; -import { FetchHandler } from '../../../helpers/fetchHandlers'; afterEach(() => { jest.clearAllMocks(); @@ -43,7 +44,13 @@ describe('NativeIntegration Source Service', () => { }); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const resp = await service.sourceTransformRoutine( + events, + sourceType, + version, + requestMetadata, + logger, + ); expect(resp).toEqual(tresponse); @@ -80,7 +87,13 @@ describe('NativeIntegration Source Service', () => { jest.spyOn(stats, 'increment').mockImplementation(() => {}); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const resp = await service.sourceTransformRoutine( + events, + sourceType, + version, + requestMetadata, + logger, + ); expect(resp).toEqual(tresponse); diff --git a/src/services/source/nativeIntegration.ts b/src/services/source/nativeIntegration.ts index 6eaef2f835..2ecfc30066 100644 --- a/src/services/source/nativeIntegration.ts +++ b/src/services/source/nativeIntegration.ts @@ -1,3 +1,4 @@ +import { FetchHandler } from '../../helpers/fetchHandlers'; import { SourceService } from '../../interfaces/SourceService'; import { ErrorDetailer, @@ -5,11 +6,11 @@ import { RudderMessage, SourceTransformationResponse, } from '../../types/index'; +import stats from '../../util/stats'; import { FixMe } from '../../util/types'; -import { SourcePostTransformationService } from './postTransformation'; -import { FetchHandler } from '../../helpers/fetchHandlers'; import tags from '../../v0/util/tags'; -import stats from '../../util/stats'; +import { MiscService } from '../misc'; +import { SourcePostTransformationService } from './postTransformation'; export class NativeIntegrationSourceService implements SourceService { public getTags(): MetaTransferObject { @@ -31,20 +32,23 @@ export class NativeIntegrationSourceService implements SourceService { version: string, // eslint-disable-next-line @typescript-eslint/no-unused-vars _requestMetadata: NonNullable, + logger: FixMe, ): Promise { const sourceHandler = FetchHandler.getSourceHandler(sourceType, version); + const metaTO = this.getTags(); + const loggerWithCtx = logger.child({ ...MiscService.getLoggableData(metaTO.errorDetails) }); const respList: SourceTransformationResponse[] = await Promise.all( sourceEvents.map(async (sourceEvent) => { try { const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse = - await sourceHandler.process(sourceEvent); + await sourceHandler.process(sourceEvent, loggerWithCtx); return SourcePostTransformationService.handleSuccessEventsSource(respEvents); } catch (error: FixMe) { - const metaTO = this.getTags(); stats.increment('source_transform_errors', { source: sourceType, version, }); + logger.debug('Error during source Transform', error); return SourcePostTransformationService.handleFailureEventsSource(error, metaTO); } }), diff --git a/src/util/openfaas/faasApi.js b/src/util/openfaas/faasApi.js index 4db5e2a81c..f8f830f6e4 100644 --- a/src/util/openfaas/faasApi.js +++ b/src/util/openfaas/faasApi.js @@ -2,6 +2,13 @@ const axios = require('axios'); const { RespStatusError, RetryRequestError } = require('../utils'); const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080'; +const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || ''; +const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || ''; + +const basicAuth = { + username: OPENFAAS_GATEWAY_USERNAME, + password: OPENFAAS_GATEWAY_PASSWORD, +}; const parseAxiosError = (error) => { if (error.response) { @@ -21,7 +28,7 @@ const deleteFunction = async (functionName) => new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; axios - .delete(url, { data: { functionName } }) + .delete(url, { data: { functionName }, auth: basicAuth }) .then(() => resolve()) .catch((err) => reject(parseAxiosError(err))); }); @@ -30,7 +37,7 @@ const getFunction = async (functionName) => new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/function/${functionName}`; axios - .get(url) + .get(url, { auth: basicAuth }) .then((resp) => resolve(resp.data)) .catch((err) => reject(parseAxiosError(err))); }); @@ -39,7 +46,7 @@ const getFunctionList = async () => new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; axios - .get(url) + .get(url, { auth: basicAuth }) .then((resp) => resolve(resp.data)) .catch((err) => reject(parseAxiosError(err))); }); @@ -48,29 +55,36 @@ const invokeFunction = async (functionName, payload) => new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`; axios - .post(url, payload) + .post(url, payload, { auth: basicAuth }) .then((resp) => resolve(resp.data)) .catch((err) => reject(parseAxiosError(err))); }); -const checkFunctionHealth = async (functionName) => - new Promise((resolve, reject) => { +const checkFunctionHealth = async (functionName) => { + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`; axios - .get(url, { - headers: { 'X-REQUEST-TYPE': 'HEALTH-CHECK' }, - }) + .get( + url, + { + headers: { 'X-REQUEST-TYPE': 'HEALTH-CHECK' }, + }, + { auth: basicAuth }, + ) .then((resp) => resolve(resp)) .catch((err) => reject(parseAxiosError(err))); }); +}; const deployFunction = async (payload) => new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; axios - .post(url, payload) + .post(url, payload, { auth: basicAuth }) .then((resp) => resolve(resp.data)) - .catch((err) => reject(parseAxiosError(err))); + .catch((err) => { + reject(parseAxiosError(err)); + }); }); module.exports = { diff --git a/src/util/openfaas/index.js b/src/util/openfaas/index.js index 00b3720e13..7a1fce3cfa 100644 --- a/src/util/openfaas/index.js +++ b/src/util/openfaas/index.js @@ -11,6 +11,11 @@ const stats = require('../stats'); const { getMetadata, getTransformationMetadata } = require('../../v0/util'); const { HTTP_STATUS_CODES } = require('../../v0/util/constant'); +const FAAS_SCALE_TYPE = process.env.FAAS_SCALE_TYPE || 'capacity'; +const FAAS_SCALE_TARGET = process.env.FAAS_SCALE_TARGET || '4'; +const FAAS_SCALE_TARGET_PROPORTION = process.env.FAAS_SCALE_TARGET_PROPORTION || '0.70'; +const FAAS_SCALE_ZERO = process.env.FAAS_SCALE_ZERO || 'false'; +const FAAS_SCALE_ZERO_DURATION = process.env.FAAS_SCALE_ZERO_DURATION || '15m'; const FAAS_BASE_IMG = process.env.FAAS_BASE_IMG || 'rudderlabs/openfaas-flask:main'; const FAAS_MAX_PODS_IN_TEXT = process.env.FAAS_MAX_PODS_IN_TEXT || '40'; const FAAS_MIN_PODS_IN_TEXT = process.env.FAAS_MIN_PODS_IN_TEXT || '1'; @@ -125,7 +130,7 @@ const deployFaasFunction = async ( trMetadata = {}, ) => { try { - logger.debug('[Faas] Deploying a faas function'); + logger.debug(`[Faas] Deploying a faas function: ${functionName}`); let envProcess = 'python index.py'; const lvidsString = libraryVersionIDs.join(','); @@ -150,6 +155,11 @@ const deployFaasFunction = async ( 'parent-component': 'openfaas', 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, + 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, + 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, + 'com.openfaas.scale.target': FAAS_SCALE_TARGET, + 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, + 'com.openfaas.scale.type': FAAS_SCALE_TYPE, transformationId: trMetadata.transformationId, workspaceId: trMetadata.workspaceId, team: 'data-management', diff --git a/src/util/redis/redisConnector.test.js b/src/util/redis/redisConnector.test.js index e0491132ff..7cf2ccbbcf 100644 --- a/src/util/redis/redisConnector.test.js +++ b/src/util/redis/redisConnector.test.js @@ -2,6 +2,7 @@ const fs = require('fs'); const path = require('path'); const version = 'v0'; const { RedisDB } = require('./redisConnector'); +const { structuredLogger: logger } = require('@rudderstack/integrations-lib'); jest.mock('ioredis', () => require('../../../test/__mocks__/redis')); const sourcesList = ['shopify']; process.env.USE_REDIS_DB = 'true'; @@ -54,7 +55,7 @@ describe(`Source Tests`, () => { data.forEach((dataPoint, index) => { it(`${index}. ${source} - ${dataPoint.description}`, async () => { try { - const output = await transformer.process(dataPoint.input); + const output = await transformer.process(dataPoint.input, logger); expect(output).toEqual(dataPoint.output); } catch (error) { expect(error.message).toEqual(dataPoint.output.error); diff --git a/src/util/utils.js b/src/util/utils.js index 0ba6008368..d74603dd7a 100644 --- a/src/util/utils.js +++ b/src/util/utils.js @@ -22,6 +22,7 @@ const staticLookup = (transformerVersionId) => async (hostname, _, cb) => { try { ips = await resolver.resolve4(hostname); } catch (error) { + logger.error(`DNS Error Code: ${error.code} | Message : ${error.message}`); stats.timing('fetch_dns_resolve_time', resolveStartTime, { transformerVersionId, error: 'true', diff --git a/src/v0/destinations/campaign_manager/transform.js b/src/v0/destinations/campaign_manager/transform.js index 14bc6d2c19..403a79a971 100644 --- a/src/v0/destinations/campaign_manager/transform.js +++ b/src/v0/destinations/campaign_manager/transform.js @@ -243,7 +243,8 @@ const batchEvents = (eventChunksArray) => { return batchedResponseList; }; -const processRouterDest = async (inputs, reqMetadata) => { +const processRouterDest = async (inputs, reqMetadata, logger) => { + logger.debug(`Transformation router request received with size ${inputs.length}`); const batchErrorRespList = []; const eventChunksArray = []; const { destination } = inputs[0]; diff --git a/src/v0/destinations/mailchimp/utils.js b/src/v0/destinations/mailchimp/utils.js index 1f4fc03ee5..a726f23a39 100644 --- a/src/v0/destinations/mailchimp/utils.js +++ b/src/v0/destinations/mailchimp/utils.js @@ -1,9 +1,12 @@ const get = require('get-value'); const md5 = require('md5'); -const { InstrumentationError, NetworkError } = require('@rudderstack/integrations-lib'); +const { + InstrumentationError, + NetworkError, + structuredLogger: logger, +} = require('@rudderstack/integrations-lib'); const myAxios = require('../../../util/myAxios'); const { MappedToDestinationKey } = require('../../../constants'); -const logger = require('../../../logger'); const { isDefinedAndNotNull, isDefined, diff --git a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js index aa4b3aacc4..13e1b3a09a 100644 --- a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js +++ b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js @@ -514,7 +514,7 @@ describe('checkEventStatusViaSchemaMatching', () => { }); // The function correctly handles events with null values. - it('should correctly handle events with null values', () => { + it('should ignore event properties with null values', () => { const event = { input: [ { @@ -537,8 +537,6 @@ describe('checkEventStatusViaSchemaMatching', () => { const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - expect(result).toEqual({ - job1: 'invalid id', - }); + expect(result).toEqual({}); }); }); diff --git a/src/v0/destinations/marketo_bulk_upload/util.js b/src/v0/destinations/marketo_bulk_upload/util.js index 4c99ba7483..033239b5e4 100644 --- a/src/v0/destinations/marketo_bulk_upload/util.js +++ b/src/v0/destinations/marketo_bulk_upload/util.js @@ -3,6 +3,7 @@ const { RetryableError, NetworkError, TransformationError, + isDefinedAndNotNull, } = require('@rudderstack/integrations-lib'); const { handleHttpRequest } = require('../../../adapters/network'); const tags = require('../../util/tags'); @@ -360,7 +361,6 @@ const getFieldSchemaMap = async (accessToken, munchkinId) => { module: 'router', }, ); - if (fieldSchemaMapping.response.errors) { handleCommonErrorResponse( fieldSchemaMapping, @@ -411,7 +411,11 @@ const checkEventStatusViaSchemaMatching = (event, fieldMap) => { const expectedDataType = SCHEMA_DATA_TYPE_MAP[fieldMap[paramName]]; const actualDataType = typeof paramValue; - if (!mismatchedFields[job_id] && actualDataType !== expectedDataType) { + if ( + isDefinedAndNotNull(paramValue) && + !mismatchedFields[job_id] && + actualDataType !== expectedDataType + ) { mismatchedFields[job_id] = `invalid ${paramName}`; } }); diff --git a/src/v0/destinations/sfmc/transform.js b/src/v0/destinations/sfmc/transform.js index 53925bc7ed..bf474ff3f0 100644 --- a/src/v0/destinations/sfmc/transform.js +++ b/src/v0/destinations/sfmc/transform.js @@ -7,8 +7,8 @@ const { isDefinedAndNotNull, isEmpty, } = require('@rudderstack/integrations-lib'); -const myAxios = require('../../../util/myAxios'); const { EventType } = require('../../../constants'); +const { handleHttpRequest } = require('../../../adapters/network'); const { CONFIG_CATEGORIES, MAPPING_CONFIG, ENDPOINTS } = require('./config'); const { removeUndefinedAndNullValues, @@ -22,10 +22,8 @@ const { getHashFromArray, simpleProcessRouterDest, } = require('../../util'); -const { - getDynamicErrorType, - nodeSysErrorToStatus, -} = require('../../../adapters/utils/networkUtils'); +const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); +const { isHttpStatusSuccess } = require('../../util'); const tags = require('../../util/tags'); const { JSON_MIME_TYPE } = require('../../util/constant'); @@ -34,51 +32,38 @@ const CONTACT_KEY_KEY = 'Contact Key'; // DOC: https://developer.salesforce.com/docs/atlas.en-us.mc-app-development.meta/mc-app-development/access-token-s2s.htm const getToken = async (clientId, clientSecret, subdomain) => { - try { - const resp = await myAxios.post( - `https://${subdomain}.${ENDPOINTS.GET_TOKEN}`, - { - grant_type: 'client_credentials', - client_id: clientId, - client_secret: clientSecret, - }, - { - 'Content-Type': JSON_MIME_TYPE, - }, - { - destType: 'sfmc', - feature: 'transformation', - endpointPath: '/token', - requestMethod: 'POST', - module: 'router', - }, - ); - if (resp && resp.data) { - return resp.data.access_token; - } - const status = resp.status || 400; + const { processedResponse: processedResponseSfmc } = await handleHttpRequest( + 'post', + `https://${subdomain}.${ENDPOINTS.GET_TOKEN}`, + { + grant_type: 'client_credentials', + client_id: clientId, + client_secret: clientSecret, + }, + { + 'Content-Type': JSON_MIME_TYPE, + }, + { + destType: 'sfmc', + feature: 'transformation', + endpointPath: '/token', + requestMethod: 'POST', + module: 'router', + }, + ); + + if (!isHttpStatusSuccess(processedResponseSfmc.status)) { throw new NetworkError( 'Could not retrieve access token', - status, + processedResponseSfmc.status || 400, { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), + [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(processedResponseSfmc.status || 400), }, - resp, + processedResponseSfmc.response, ); - } catch (error) { - if (!isEmpty(error.response)) { - const status = error.status || 400; - throw new NetworkError(`Authorization Failed ${error.response.statusText}`, status, { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), - }); - } else { - const httpError = nodeSysErrorToStatus(error.code); - const status = httpError.status || 400; - throw new NetworkError(`Authorization Failed ${httpError.message}`, status, { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), - }); - } } + + return processedResponseSfmc.response.access_token; }; // DOC : https://developer.salesforce.com/docs/atlas.en-us.noversion.mc-apis.meta/mc-apis/createContacts.htm diff --git a/src/v0/destinations/twitter_ads/transform.js b/src/v0/destinations/twitter_ads/transform.js index 268dca3636..365663925e 100644 --- a/src/v0/destinations/twitter_ads/transform.js +++ b/src/v0/destinations/twitter_ads/transform.js @@ -156,7 +156,8 @@ function validateRequest(message) { } } -function process(event) { +function process(event, requestMetadata, logger) { + logger.info(`[TWITTER ADS]: Transforming request received with info`); const { message, metadata, destination } = event; validateRequest(message); diff --git a/src/v0/sources/canny/transform.js b/src/v0/sources/canny/transform.js index 9188f5ac34..38ed5e137e 100644 --- a/src/v0/sources/canny/transform.js +++ b/src/v0/sources/canny/transform.js @@ -2,7 +2,6 @@ const sha256 = require('sha256'); const { TransformationError } = require('@rudderstack/integrations-lib'); const Message = require('../message'); const { voterMapping, authorMapping, checkForRequiredFields } = require('./util'); -const { logger } = require('../../../logger'); const CannyOperation = { VOTE_CREATED: 'vote.created', @@ -15,7 +14,7 @@ const CannyOperation = { * @param {*} event * @param {*} typeOfUser */ -function settingIds(message, event, typeOfUser) { +function settingIds(message, event, typeOfUser, logger) { const clonedMessage = { ...message }; try { // setting up userId @@ -48,7 +47,7 @@ function settingIds(message, event, typeOfUser) { * @param {*} typeOfUser * @returns message */ -function createMessage(event, typeOfUser) { +function createMessage(event, typeOfUser, logger) { const message = new Message(`Canny`); message.setEventType('track'); @@ -61,7 +60,7 @@ function createMessage(event, typeOfUser) { message.context.integration.version = '1.0.0'; - const finalMessage = settingIds(message, event, typeOfUser); + const finalMessage = settingIds(message, event, typeOfUser, logger); checkForRequiredFields(finalMessage); @@ -73,7 +72,7 @@ function createMessage(event, typeOfUser) { return finalMessage; } -function process(event) { +function process(event, logger) { let typeOfUser; switch (event.type) { @@ -86,6 +85,6 @@ function process(event) { typeOfUser = 'author'; } - return createMessage(event, typeOfUser); + return createMessage(event, typeOfUser, logger); } module.exports = { process }; diff --git a/src/v0/sources/shopify/transform.js b/src/v0/sources/shopify/transform.js index b6d2a4c6cf..4886fb3df1 100644 --- a/src/v0/sources/shopify/transform.js +++ b/src/v0/sources/shopify/transform.js @@ -16,7 +16,6 @@ const { const { RedisDB } = require('../../../util/redis/redisConnector'); const { removeUndefinedAndNullValues, isDefinedAndNotNull } = require('../../util'); const Message = require('../message'); -const logger = require('../../../logger'); const { EventType } = require('../../../constants'); const { INTEGERATION, @@ -206,7 +205,7 @@ const processEvent = async (inputEvent, metricMetadata) => { }; const isIdentifierEvent = (event) => ['rudderIdentifier', 'rudderSessionIdentifier'].includes(event?.event); -const processIdentifierEvent = async (event, metricMetadata) => { +const processIdentifierEvent = async (event, metricMetadata, logger) => { if (useRedisDatabase) { let value; let field; @@ -256,13 +255,13 @@ const processIdentifierEvent = async (event, metricMetadata) => { } return NO_OPERATION_SUCCESS; }; -const process = async (event) => { +const process = async (event, logger) => { const metricMetadata = { writeKey: event.query_parameters?.writeKey?.[0], source: 'SHOPIFY', }; if (isIdentifierEvent(event)) { - return processIdentifierEvent(event, metricMetadata); + return processIdentifierEvent(event, metricMetadata, logger); } const response = await processEvent(event, metricMetadata); return response; diff --git a/src/v0/sources/shopify/util.js b/src/v0/sources/shopify/util.js index c4bbb61b9c..3dc54cc434 100644 --- a/src/v0/sources/shopify/util.js +++ b/src/v0/sources/shopify/util.js @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/naming-convention */ const { v5 } = require('uuid'); const sha256 = require('sha256'); -const { TransformationError } = require('@rudderstack/integrations-lib'); +const { TransformationError, structuredLogger: logger } = require('@rudderstack/integrations-lib'); const stats = require('../../../util/stats'); const { constructPayload, @@ -12,7 +12,6 @@ const { isDefinedAndNotNull, } = require('../../util'); const { RedisDB } = require('../../../util/redis/redisConnector'); -const logger = require('../../../logger'); const { lineItemsMappingJSON, productMappingJSON, diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 32872cc5d9..ac1bacf404 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -1328,12 +1328,19 @@ const generateExclusionList = (mappingConfig) => */ function extractCustomFields(message, payload, keys, exclusionFields) { const mappingKeys = []; + // Define reserved words + const reservedWords = ['__proto__', 'constructor', 'prototype']; + + const isReservedWord = (key) => reservedWords.includes(key); + if (Array.isArray(keys)) { keys.forEach((key) => { const messageContext = get(message, key); if (messageContext) { Object.keys(messageContext).forEach((k) => { - if (!exclusionFields.includes(k)) mappingKeys.push(k); + if (!exclusionFields.includes(k) && !isReservedWord(k)) { + mappingKeys.push(k); + } }); mappingKeys.forEach((mappingKey) => { if (!(typeof messageContext[mappingKey] === 'undefined')) { @@ -1344,7 +1351,9 @@ function extractCustomFields(message, payload, keys, exclusionFields) { }); } else if (keys === 'root') { Object.keys(message).forEach((k) => { - if (!exclusionFields.includes(k)) mappingKeys.push(k); + if (!exclusionFields.includes(k) && !isReservedWord(k)) { + mappingKeys.push(k); + } }); mappingKeys.forEach((mappingKey) => { if (!(typeof message[mappingKey] === 'undefined')) { diff --git a/src/v0/util/index.test.js b/src/v0/util/index.test.js index 810eb5a9d4..c34d513325 100644 --- a/src/v0/util/index.test.js +++ b/src/v0/util/index.test.js @@ -506,3 +506,187 @@ describe('validateEventAndLowerCaseConversion Tests', () => { }).toThrow(InstrumentationError); }); }); + +describe('extractCustomFields', () => { + // Handle reserved words in message keys + it('should handle reserved word "prototype" in message keys when keys are provided', () => { + const message = { + traits: { + firstName: 'John', + lastName: 'Doe', + email: 'john.doe@example.com', + prototype: 'reserved', + }, + context: { + traits: { + phone: '1234567890', + city: 'New York', + country: 'USA', + prototype: 'reserved', + }, + }, + properties: { + title: 'Developer', + organization: 'ABC Company', + zip: '12345', + prototype: 'reserved', + }, + }; + + const payload = {}; + + const keys = ['properties', 'context.traits', 'traits']; + + const exclusionFields = [ + 'firstName', + 'lastName', + 'phone', + 'title', + 'organization', + 'city', + 'region', + 'country', + 'zip', + 'image', + 'timezone', + ]; + + const result = utilities.extractCustomFields(message, payload, keys, exclusionFields); + + expect(result).toEqual({ + email: 'john.doe@example.com', + }); + }); + + it('should handle reserved word "__proto__" in message keys when keys are provided', () => { + const message = { + traits: { + firstName: 'John', + lastName: 'Doe', + email: 'john.doe@example.com', + __proto__: 'reserved', + }, + context: { + traits: { + phone: '1234567890', + city: 'New York', + country: 'USA', + __proto__: 'reserved', + }, + }, + properties: { + title: 'Developer', + organization: 'ABC Company', + zip: '12345', + __proto__: 'reserved', + }, + }; + + const payload = {}; + + const keys = ['properties', 'context.traits', 'traits']; + + const exclusionFields = [ + 'firstName', + 'lastName', + 'phone', + 'title', + 'organization', + 'city', + 'region', + 'country', + 'zip', + 'image', + 'timezone', + ]; + const result = utilities.extractCustomFields(message, payload, keys, exclusionFields); + expect(result).toEqual({ + email: 'john.doe@example.com', + }); + }); + + it('should handle reserved word "constructor" in message keys when keys are provided', () => { + const message = { + traits: { + firstName: 'John', + lastName: 'Doe', + email: 'john.doe@example.com', + constructor: 'reserved', + }, + context: { + traits: { + phone: '1234567890', + city: 'New York', + country: 'USA', + constructor: 'reserved', + }, + }, + properties: { + title: 'Developer', + organization: 'ABC Company', + zip: '12345', + constructor: 'reserved', + }, + }; + + const payload = {}; + + const keys = ['properties', 'context.traits', 'traits']; + + const exclusionFields = [ + 'firstName', + 'lastName', + 'phone', + 'title', + 'organization', + 'city', + 'region', + 'country', + 'zip', + 'image', + 'timezone', + ]; + const result = utilities.extractCustomFields(message, payload, keys, exclusionFields); + expect(result).toEqual({ + email: 'john.doe@example.com', + }); + }); + + it('should handle reserved words in message keys when key is root', () => { + const message = { + firstName: 'John', + lastName: 'Doe', + email: 'john.doe@example.com', + prototype: 'reserved', + phone: '1234567890', + city: 'New York', + country: 'USA', + __proto__: 'reserved', + constructor: 'reserved', + }; + + const payload = {}; + + const keys = 'root'; + + const exclusionFields = [ + 'firstName', + 'lastName', + 'phone', + 'title', + 'organization', + 'city', + 'region', + 'country', + 'zip', + 'image', + 'timezone', + ]; + + const result = utilities.extractCustomFields(message, payload, keys, exclusionFields); + + expect(result).toEqual({ + email: 'john.doe@example.com', + }); + }); +}); diff --git a/test/__tests__/pinterestConversion-cdk.test.ts b/test/__tests__/pinterestConversion-cdk.test.ts index f4da92eea9..6aaa710ed7 100644 --- a/test/__tests__/pinterestConversion-cdk.test.ts +++ b/test/__tests__/pinterestConversion-cdk.test.ts @@ -1,6 +1,7 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import fs from 'fs'; import path from 'path'; -import { processCdkV2Workflow, getWorkflowEngine, executeWorkflow } from '../../src/cdk/v2/handler'; +import { executeWorkflow, getWorkflowEngine, processCdkV2Workflow } from '../../src/cdk/v2/handler'; import tags from '../../src/v0/util/tags'; const integration = 'pinterest_tag'; @@ -22,7 +23,12 @@ describe(`${name} Tests`, () => { it(`${name} - payload: ${index}`, async () => { const expected = expectedData[index]; try { - const output = await processCdkV2Workflow(integration, input, tags.FEATURES.PROCESSOR); + const output = await processCdkV2Workflow( + integration, + input, + tags.FEATURES.PROCESSOR, + logger, + ); expect(output).toEqual(expected); } catch (error: any) { expect(error.message).toEqual(expected.error); @@ -46,7 +52,12 @@ describe(`${name} Tests`, () => { it(`${name} - payload: ${index}`, async () => { const expected = expectedData[index]; try { - const output = await processCdkV2Workflow(integration, input, tags.FEATURES.PROCESSOR); + const output = await processCdkV2Workflow( + integration, + input, + tags.FEATURES.PROCESSOR, + logger, + ); expect(output).toEqual(expected); } catch (error: any) { expect(error.message).toEqual(expected.error); @@ -91,6 +102,7 @@ describe(`${name} Tests`, () => { integration, inputRouterErrorData, tags.FEATURES.ROUTER, + logger, ); expect(output).toEqual(expectedRouterErrorData); }); @@ -98,7 +110,12 @@ describe(`${name} Tests`, () => { describe('Default Batch size', () => { inputRouterData.forEach((input, index) => { it(`Payload: ${index}`, async () => { - const output = await processCdkV2Workflow(integration, input, tags.FEATURES.ROUTER); + const output = await processCdkV2Workflow( + integration, + input, + tags.FEATURES.ROUTER, + logger, + ); expect(output).toEqual(expectedRouterData[index]); }); }); diff --git a/test/__tests__/user_transformation.test.js b/test/__tests__/user_transformation.test.js index 8b781cda9a..924bf4f791 100644 --- a/test/__tests__/user_transformation.test.js +++ b/test/__tests__/user_transformation.test.js @@ -37,6 +37,10 @@ const { parserForImport } = require("../../src/util/parser"); const { RetryRequestError, RespStatusError } = require("../../src/util/utils"); const OPENFAAS_GATEWAY_URL = "http://localhost:8080"; +const defaultBasicAuth = { + "username": "", + "password": "" +}; const randomID = () => Math.random() @@ -1400,12 +1404,14 @@ describe("Python transformations", () => { expect(axios.post).toHaveBeenCalledTimes(1); expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/system/functions`, - expect.objectContaining({ name: funcName, service: funcName }) + expect.objectContaining({ name: funcName, service: funcName }), + { auth: defaultBasicAuth }, ); expect(axios.get).toHaveBeenCalledTimes(1); expect(axios.get).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, - {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}} + {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}, + { auth: defaultBasicAuth }, ); }); @@ -1622,7 +1628,8 @@ describe("Python transformations", () => { expect(axios.post).toHaveBeenCalledTimes(1); expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, - inputData + inputData, + { auth: defaultBasicAuth }, ); }); @@ -1655,17 +1662,20 @@ describe("Python transformations", () => { expect(axios.post).toHaveBeenCalledTimes(2); expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, - inputData + inputData, + { auth: defaultBasicAuth }, ); expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/system/functions`, - expect.objectContaining({ name: funcName, service: funcName }) + expect.objectContaining({ name: funcName, service: funcName }), + { auth: defaultBasicAuth }, ); expect(axios.get).toHaveBeenCalledTimes(1); expect(axios.get).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, - {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}} + {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}, + { auth: defaultBasicAuth }, ); }); diff --git a/test/apitests/service.api.test.ts b/test/apitests/service.api.test.ts index 266619b6ac..e46357f824 100644 --- a/test/apitests/service.api.test.ts +++ b/test/apitests/service.api.test.ts @@ -1,13 +1,13 @@ import fs from 'fs'; -import path from 'path'; -import request from 'supertest'; import { createHttpTerminator } from 'http-terminator'; import Koa from 'koa'; import bodyParser from 'koa-bodyparser'; +import path from 'path'; import setValue from 'set-value'; -import { applicationRoutes } from '../../src/routes'; -import { FetchHandler } from '../../src/helpers/fetchHandlers'; +import request from 'supertest'; import networkHandlerFactory from '../../src/adapters/networkHandlerFactory'; +import { FetchHandler } from '../../src/helpers/fetchHandlers'; +import { applicationRoutes } from '../../src/routes'; let server: any; const OLD_ENV = process.env; diff --git a/test/integrations/destinations/fullstory/processor/data.ts b/test/integrations/destinations/fullstory/processor/data.ts index d206b4a84f..9c8d29c7e8 100644 --- a/test/integrations/destinations/fullstory/processor/data.ts +++ b/test/integrations/destinations/fullstory/processor/data.ts @@ -149,7 +149,7 @@ export const data = [ id: 's001', }, user: { - id: 'u001', + uid: 'u001', }, }, JSON_ARRAY: {}, diff --git a/test/integrations/destinations/sfmc/network.ts b/test/integrations/destinations/sfmc/network.ts index 7564d8c6d5..93854e3691 100644 --- a/test/integrations/destinations/sfmc/network.ts +++ b/test/integrations/destinations/sfmc/network.ts @@ -11,4 +11,54 @@ export const networkCallsData = [ }, }, }, + { + httpReq: { + url: 'https://testHandleHttpRequest401.auth.marketingcloudapis.com/v2/token', + method: 'POST', + }, + httpRes: { + status: 401, + data: { + error: 'invalid_client', + error_description: + 'Invalid client ID. Use the client ID in Marketing Cloud Installed Packages.', + error_uri: 'https://developer.salesforce.com/docs', + }, + }, + }, + { + httpReq: { + url: 'https://testHandleHttpRequest429.auth.marketingcloudapis.com/v2/token', + method: 'POST', + }, + httpRes: { + status: 429, + data: { + message: 'Your requests are temporarily blocked.', + errorcode: 50200, + documentation: + 'https://developer.salesforce.com/docs/atlas.en-us.mc-apis.meta/mc-apis/error-handling.htm', + }, + }, + }, + { + httpReq: { + url: 'https://testHandleHttpRequest-dns.auth.marketingcloudapis.com/v2/token', + method: 'POST', + }, + httpRes: { + data: {}, + status: 400, + }, + }, + { + httpReq: { + url: 'https://testHandleHttpRequest-null.auth.marketingcloudapis.com/v2/token', + method: 'POST', + }, + httpRes: { + data: null, + status: 500, + }, + }, ]; diff --git a/test/integrations/destinations/sfmc/processor/data.ts b/test/integrations/destinations/sfmc/processor/data.ts index b2839908ad..883032d223 100644 --- a/test/integrations/destinations/sfmc/processor/data.ts +++ b/test/integrations/destinations/sfmc/processor/data.ts @@ -1894,4 +1894,326 @@ export const data = [ }, }, }, + { + name: 'sfmc', + description: 'Tests 401 un authenticated code from sfmc', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + message: { + event: 'message event', + type: 'track', + userId: '12345', + properties: { + id: 'id101', + contactId: 'cid101', + email: 'testemail@gmail.com', + accountNumber: '99110099', + patronName: 'SP', + }, + }, + destination: { + ID: '1pYpzzvcn7AQ2W9GGIAZSsN6Mfq', + Name: 'SFMC', + DestinationDefinition: { + ID: '1pYpYSeQd8OeN6xPdw6VGDzqUd1', + Name: 'SFMC', + DisplayName: 'Salesforce Marketing Cloud', + Config: { + destConfig: [], + excludeKeys: [], + includeKeys: [], + saveDestinationResponse: false, + supportedSourceTypes: [], + transformAt: 'processor', + }, + ResponseRules: {}, + }, + Config: { + clientId: 'testHandleHttpRequest401', + clientSecret: 'testHandleHttpRequest401', + createOrUpdateContacts: false, + eventDelivery: true, + eventDeliveryTS: 1615371070621, + eventToExternalKey: [ + { + from: 'Event Name', + to: 'C500FD37-155C-49BD-A21B-AFCEF3D1A9CB', + }, + { + from: 'Watch', + to: 'C500FD37-155C-49BD-A21B-AFCEF3D1A9CB', + }, + ], + eventToPrimaryKey: [ + { + from: 'userId', + to: 'User Key', + }, + { + from: 'watch', + to: 'Guest Key, Contact Key', + }, + ], + eventToUUID: [ + { + event: 'Event Name', + uuid: true, + }, + ], + eventToDefinitionMapping: [ + { + from: 'message event', + to: 'test-event-definition', + }, + ], + externalKey: 'f3ffa19b-e0b3-4967-829f-549b781080e6', + subDomain: 'testHandleHttpRequest401', + }, + Enabled: true, + Transformations: [], + }, + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + error: + '{"message":"Could not retrieve access token","destinationResponse":{"error":"invalid_client","error_description":"Invalid client ID. Use the client ID in Marketing Cloud Installed Packages.","error_uri":"https://developer.salesforce.com/docs"}}', + statTags: { + destType: 'SFMC', + errorCategory: 'network', + errorType: 'aborted', + feature: 'processor', + implementation: 'native', + module: 'destination', + }, + statusCode: 401, + }, + ], + }, + }, + }, + { + name: 'sfmc', + description: 'Tests 429 status code from sfmc', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + message: { + event: 'message event', + type: 'track', + userId: '12345', + properties: { + id: 'id101', + contactId: 'cid101', + email: 'testemail@gmail.com', + accountNumber: '99110099', + patronName: 'SP', + }, + }, + destination: { + ID: '1pYpzzvcn7AQ2W9GGIAZSsN6Mfq', + Name: 'SFMC', + DestinationDefinition: { + ID: '1pYpYSeQd8OeN6xPdw6VGDzqUd1', + Name: 'SFMC', + DisplayName: 'Salesforce Marketing Cloud', + Config: { + destConfig: [], + excludeKeys: [], + includeKeys: [], + saveDestinationResponse: false, + supportedSourceTypes: [], + transformAt: 'processor', + }, + ResponseRules: {}, + }, + Config: { + clientId: 'testHandleHttpRequest429', + clientSecret: 'testHandleHttpRequest429', + subDomain: 'testHandleHttpRequest429', + }, + Enabled: true, + Transformations: [], + }, + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + error: + '{"message":"Could not retrieve access token","destinationResponse":{"message":"Your requests are temporarily blocked.","errorcode":50200,"documentation":"https://developer.salesforce.com/docs/atlas.en-us.mc-apis.meta/mc-apis/error-handling.htm"}}', + statTags: { + destType: 'SFMC', + errorCategory: 'network', + errorType: 'throttled', + feature: 'processor', + implementation: 'native', + module: 'destination', + }, + statusCode: 429, + }, + ], + }, + }, + }, + { + name: 'sfmc', + description: 'Tests DNS lookup failure for sfmc', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + message: { + event: 'message event', + type: 'track', + userId: '12345', + properties: { + id: 'id101', + contactId: 'cid101', + email: 'testemail@gmail.com', + accountNumber: '99110099', + patronName: 'SP', + }, + }, + destination: { + ID: '1pYpzzvcn7AQ2W9GGIAZSsN6Mfq', + Name: 'SFMC', + DestinationDefinition: { + ID: '1pYpYSeQd8OeN6xPdw6VGDzqUd1', + Name: 'SFMC', + DisplayName: 'Salesforce Marketing Cloud', + Config: { + destConfig: [], + excludeKeys: [], + includeKeys: [], + saveDestinationResponse: false, + supportedSourceTypes: [], + transformAt: 'processor', + }, + ResponseRules: {}, + }, + Config: { + clientId: 'testHandleHttpRequest-dns', + clientSecret: 'testHandleHttpRequest-dns', + subDomain: 'testHandleHttpRequest-dns', + }, + Enabled: true, + Transformations: [], + }, + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + error: '{"message":"Could not retrieve access token","destinationResponse":{}}', + statTags: { + destType: 'SFMC', + errorCategory: 'network', + errorType: 'aborted', + feature: 'processor', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, + }, + ], + }, + }, + }, + { + name: 'sfmc', + description: 'Test 500 status failure for sfmc', + feature: 'processor', + module: 'destination', + version: 'v0', + input: { + request: { + body: [ + { + message: { + event: 'message event', + type: 'track', + userId: '12345', + properties: { + id: 'id101', + contactId: 'cid101', + email: 'testemail@gmail.com', + accountNumber: '99110099', + patronName: 'SP', + }, + }, + destination: { + ID: '1pYpzzvcn7AQ2W9GGIAZSsN6Mfq', + Name: 'SFMC', + DestinationDefinition: { + ID: '1pYpYSeQd8OeN6xPdw6VGDzqUd1', + Name: 'SFMC', + DisplayName: 'Salesforce Marketing Cloud', + Config: { + destConfig: [], + excludeKeys: [], + includeKeys: [], + saveDestinationResponse: false, + supportedSourceTypes: [], + transformAt: 'processor', + }, + ResponseRules: {}, + }, + Config: { + clientId: 'testHandleHttpRequest-null', + clientSecret: 'testHandleHttpRequest-null', + subDomain: 'testHandleHttpRequest-null', + }, + Enabled: true, + Transformations: [], + }, + }, + ], + }, + }, + output: { + response: { + status: 200, + body: [ + { + error: 'Could not retrieve access token', + statTags: { + destType: 'SFMC', + errorCategory: 'network', + errorType: 'retryable', + feature: 'processor', + implementation: 'native', + module: 'destination', + }, + statusCode: 500, + }, + ], + }, + }, + }, ];