diff --git a/.eslintrc.json b/.eslintrc.json index 7258c5c536..556470697d 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -26,6 +26,7 @@ "off", { "cases": { "camelCase": true, "pascalCase": true, "kebabCase": true } } ], + "import/no-import-module-exports": "off", "unicorn/no-instanceof-array": "error", "unicorn/no-static-only-class": "error", "unicorn/consistent-destructuring": "error", diff --git a/package-lock.json b/package-lock.json index 65659a745c..fd42692109 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,7 @@ "@koa/router": "^12.0.0", "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.9", - "@rudderstack/integrations-lib": "^0.2.7", + "@rudderstack/integrations-lib": "^0.2.8", "@rudderstack/workflow-engine": "^0.7.5", "@shopify/jest-koa-mocks": "^5.1.1", "ajv": "^8.12.0", @@ -118,41 +118,6 @@ "typescript": "^5.0.4" } }, - "../rudder-integrations-lib": { - "name": "@rudderstack/integrations-lib", - "version": "0.1.10", - "extraneous": true, - "license": "MIT", - "dependencies": { - "@rudderstack/workflow-engine": "^0.5.7", - "axios": "^1.4.0", - "axios-mock-adapter": "^1.22.0", - "crypto": "^1.0.1", - "get-value": "^3.0.1", - "handlebars": "^4.7.8", - "lodash": "^4.17.21", - "moment": "^2.29.4", - "moment-timezone": "^0.5.43", - "set-value": "^4.1.0", - "sha256": "^0.2.0", - "tslib": "^2.4.0", - "winston": "^3.11.0" - }, - "devDependencies": { - "@types/get-value": "^3.0.3", - "@types/jest": "^29.5.4", - "@types/lodash": "^4.14.195", - "@types/node": "^20.3.3", - "@types/set-value": "^4.0.1", - "@types/sha256": "^0.2.0", - "jest": "^29.4.3", - "pre-commit": "^1.2.2", - "prettier": "^2.8.4", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.1.6" - } - }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "resolved": "https://registry.npmjs.org/@aashutoshrathi/word-wrap/-/word-wrap-1.2.6.tgz", @@ -4472,9 +4437,9 @@ } }, "node_modules/@rudderstack/integrations-lib": { - "version": "0.2.7", - "resolved": "https://registry.npmjs.org/@rudderstack/integrations-lib/-/integrations-lib-0.2.7.tgz", - "integrity": "sha512-F0QVIT2vpSeI+GcUk7AwxMJrmM5SsRk8AS6oH4nHkjjfDoKjdh9rrDVzhXKUYF//FAi32ecmSsW+/6ioB8louw==", + "version": "0.2.8", + "resolved": "https://registry.npmjs.org/@rudderstack/integrations-lib/-/integrations-lib-0.2.8.tgz", + "integrity": "sha512-5CJoFFCRDhG7busCGVktKqEEXO0DbFqJ56TOT+jyDdoTf8sZ7SsSJ4NCZYmSplZrbQGj2R+aArnQnpxA4hPGmA==", "dependencies": { "axios": "^1.4.0", "axios-mock-adapter": "^1.22.0", diff --git a/package.json b/package.json index c91d836c34..c839bc8acc 100644 --- a/package.json +++ b/package.json @@ -64,7 +64,7 @@ "@koa/router": "^12.0.0", "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.9", - "@rudderstack/integrations-lib": "^0.2.7", + "@rudderstack/integrations-lib": "^0.2.8", "@rudderstack/workflow-engine": "^0.7.5", "@shopify/jest-koa-mocks": "^5.1.1", "ajv": "^8.12.0", diff --git a/src/cdk/v2/handler.ts b/src/cdk/v2/handler.ts index edd14e7298..c437247f74 100644 --- a/src/cdk/v2/handler.ts +++ b/src/cdk/v2/handler.ts @@ -1,9 +1,9 @@ import { - WorkflowEngine, - WorkflowEngineFactory, - TemplateType, ExecutionBindings, StepOutput, + TemplateType, + WorkflowEngine, + WorkflowEngineFactory, } from '@rudderstack/workflow-engine'; import { FixMe } from '../../util/types'; @@ -11,9 +11,9 @@ import tags from '../../v0/util/tags'; import { getErrorInfo, + getPlatformBindingsPaths, getRootPathForDestination, getWorkflowPath, - getPlatformBindingsPaths, isCdkV2Destination, } from './utils'; @@ -82,10 +82,12 @@ export async function processCdkV2Workflow( destType: string, parsedEvent: FixMe, feature: string, + logger: FixMe, requestMetadata: NonNullable = {}, bindings: Record = {}, ) { try { + logger.debug(`Processing cdkV2 workflow`); const workflowEngine = await getCachedWorkflowEngine(destType, feature, bindings); return await executeWorkflow(workflowEngine, parsedEvent, requestMetadata); } catch (error) { diff --git a/src/controllers/bulkUpload.ts b/src/controllers/bulkUpload.ts index dbd77dc07f..28556dd5df 100644 --- a/src/controllers/bulkUpload.ts +++ b/src/controllers/bulkUpload.ts @@ -1,10 +1,10 @@ /* eslint-disable global-require, import/no-dynamic-require, @typescript-eslint/no-unused-vars */ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { client as errNotificationClient } from '../util/errorNotifier'; -import logger from '../logger'; import { + getDestFileUploadHandler, getJobStatusHandler, getPollStatusHandler, - getDestFileUploadHandler, } from '../util/fetchDestinationHandlers'; import { CatchErr, ContextBodySimple } from '../util/types'; // TODO: To be refactored and redisgned @@ -31,10 +31,7 @@ const getReqMetadata = (ctx) => { }; export const fileUpload = async (ctx) => { - logger.debug( - 'Native(Bulk-Upload): Request to transformer:: /fileUpload route', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Bulk-Upload): Request to transformer:: /fileUpload route', ctx.request.body); const getReqMetadataFileUpload = () => { try { const reqBody = ctx.request.body; @@ -69,18 +66,12 @@ export const fileUpload = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Response from transformer:: /fileUpload route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Response from transformer:: /fileUpload route', ctx.body); return ctx.body; }; export const pollStatus = async (ctx) => { - logger.debug( - 'Native(Bulk-Upload): Request to transformer:: /pollStatus route', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Bulk-Upload): Request to transformer:: /pollStatus route', ctx.request.body); const { destType }: ContextBodySimple = ctx.request.body; const destFileUploadHandler = getPollStatusHandler('v0', destType.toLowerCase()); @@ -104,17 +95,14 @@ export const pollStatus = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Request from transformer:: /pollStatus route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Request from transformer:: /pollStatus route', ctx.body); return ctx.body; }; export const getWarnJobStatus = async (ctx) => { logger.debug( 'Native(Bulk-Upload): Request to transformer:: /getWarningJobs route', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const { destType }: ContextBodySimple = ctx.request.body; @@ -140,17 +128,14 @@ export const getWarnJobStatus = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Request from transformer:: /getWarningJobs route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Request from transformer:: /getWarningJobs route', ctx.body); return ctx.body; }; export const getFailedJobStatus = async (ctx) => { logger.debug( 'Native(Bulk-Upload): Request to transformer:: /getFailedJobs route', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const { destType }: ContextBodySimple = ctx.request.body; @@ -176,9 +161,6 @@ export const getFailedJobStatus = async (ctx) => { }); } ctx.body = response; - logger.debug( - 'Native(Bulk-Upload): Request from transformer:: /getFailedJobs route', - JSON.stringify(ctx.body), - ); + logger.debug('Native(Bulk-Upload): Request from transformer:: /getFailedJobs route', ctx.body); return ctx.body; }; diff --git a/src/controllers/delivery.ts b/src/controllers/delivery.ts index 4334dc33b2..8ac7c9902a 100644 --- a/src/controllers/delivery.ts +++ b/src/controllers/delivery.ts @@ -1,28 +1,30 @@ /* eslint-disable prefer-destructuring */ /* eslint-disable sonarjs/no-duplicate-string */ +import { + isDefinedAndNotNullAndNotEmpty, + structuredLogger as logger, +} from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { isDefinedAndNotNullAndNotEmpty } from '@rudderstack/integrations-lib'; +import { ServiceSelector } from '../helpers/serviceSelector'; +import { DeliveryTestService } from '../services/delivertTest/deliveryTest'; +import { DestinationPostTransformationService } from '../services/destination/postTransformation'; import { MiscService } from '../services/misc'; import { - DeliveryV1Response, DeliveryV0Response, + DeliveryV1Response, ProcessorTransformationOutput, ProxyV0Request, ProxyV1Request, } from '../types/index'; -import { ServiceSelector } from '../helpers/serviceSelector'; -import { DeliveryTestService } from '../services/delivertTest/deliveryTest'; -import { ControllerUtility } from './util'; -import logger from '../logger'; -import { DestinationPostTransformationService } from '../services/destination/postTransformation'; -import tags from '../v0/util/tags'; import { FixMe } from '../util/types'; +import tags from '../v0/util/tags'; +import { ControllerUtility } from './util'; const NON_DETERMINABLE = 'Non-determinable'; export class DeliveryController { public static async deliverToDestination(ctx: Context) { - logger.debug('Native(Delivery):: Request to transformer::', JSON.stringify(ctx.request.body)); + logger.info('Native(Delivery):: Request to transformer::', ctx.request.body); let deliveryResponse: DeliveryV0Response; const requestMetadata = MiscService.getRequestMetadata(ctx); const deliveryRequest = ctx.request.body as ProxyV0Request; @@ -52,12 +54,12 @@ export class DeliveryController { ctx.body = { output: deliveryResponse }; ControllerUtility.deliveryPostProcess(ctx, deliveryResponse.status); - logger.debug('Native(Delivery):: Response from transformer::', JSON.stringify(ctx.body)); + logger.debug('Native(Delivery):: Response from transformer::', ctx.body); return ctx; } public static async deliverToDestinationV1(ctx: Context) { - logger.debug('Native(Delivery):: Request to transformer::', JSON.stringify(ctx.request.body)); + logger.debug('Native(Delivery):: Request to transformer::', ctx.request.body); let deliveryResponse: DeliveryV1Response; const requestMetadata = MiscService.getRequestMetadata(ctx); const deliveryRequest = ctx.request.body as ProxyV1Request; @@ -91,15 +93,12 @@ export class DeliveryController { ControllerUtility.deliveryPostProcess(ctx); } - logger.debug('Native(Delivery):: Response from transformer::', JSON.stringify(ctx.body)); + logger.debug('Native(Delivery):: Response from transformer::', ctx.body); return ctx; } public static async testDestinationDelivery(ctx: Context) { - logger.debug( - 'Native(Delivery-Test):: Request to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Delivery-Test):: Request to transformer::', ctx.request.body); const { destination }: { destination: string } = ctx.params; const { version }: { version: string } = ctx.params; const { @@ -117,7 +116,7 @@ export class DeliveryController { ); ctx.body = { output: response }; ControllerUtility.postProcess(ctx); - logger.debug('Native(Delivery-Test):: Response from transformer::', JSON.stringify(ctx.body)); + logger.debug('Native(Delivery-Test):: Response from transformer::', ctx.body); return ctx; } } diff --git a/src/controllers/destination.ts b/src/controllers/destination.ts index d8b3c94524..35606ea62e 100644 --- a/src/controllers/destination.ts +++ b/src/controllers/destination.ts @@ -1,29 +1,27 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { MiscService } from '../services/misc'; -import { DestinationPreTransformationService } from '../services/destination/preTransformation'; +import { ServiceSelector } from '../helpers/serviceSelector'; import { DestinationPostTransformationService } from '../services/destination/postTransformation'; +import { DestinationPreTransformationService } from '../services/destination/preTransformation'; +import { MiscService } from '../services/misc'; import { + ErrorDetailer, ProcessorTransformationRequest, - RouterTransformationRequest, ProcessorTransformationResponse, + RouterTransformationRequest, RouterTransformationResponse, } from '../types/index'; -import { ServiceSelector } from '../helpers/serviceSelector'; -import { ControllerUtility } from './util'; +import { DynamicConfigParser } from '../util/dynamicConfigParser'; import stats from '../util/stats'; -import logger from '../logger'; import { getIntegrationVersion } from '../util/utils'; -import tags from '../v0/util/tags'; -import { DynamicConfigParser } from '../util/dynamicConfigParser'; import { checkInvalidRtTfEvents } from '../v0/util'; +import tags from '../v0/util/tags'; +import { ControllerUtility } from './util'; export class DestinationController { public static async destinationTransformAtProcessor(ctx: Context) { const startTime = new Date(); - logger.debug( - 'Native(Process-Transform):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Process-Transform):: Requst to transformer::', ctx.request.body); let resplist: ProcessorTransformationResponse[]; const requestMetadata = MiscService.getRequestMetadata(ctx); let events = ctx.request.body as ProcessorTransformationRequest[]; @@ -35,6 +33,9 @@ export class DestinationController { ...metaTags, }); const integrationService = ServiceSelector.getDestinationService(events); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(events[0]?.metadata as unknown as ErrorDetailer), + }); try { integrationService.init(); events = DestinationPreTransformationService.preProcess( @@ -50,6 +51,7 @@ export class DestinationController { destination, version, requestMetadata, + loggerWithCtx, ); } catch (error: any) { resplist = events.map((ev) => { @@ -69,10 +71,7 @@ export class DestinationController { } ctx.body = resplist; ControllerUtility.postProcess(ctx); - logger.debug( - 'Native(Process-Transform):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Process-Transform):: Response from transformer::', ctx.body); stats.histogram('dest_transform_output_events', resplist.length, { destination, version, @@ -94,10 +93,7 @@ export class DestinationController { public static async destinationTransformAtRouter(ctx: Context) { const startTime = new Date(); - logger.debug( - 'Native(Router-Transform):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Router-Transform):: Requst to transformer::', ctx.request.body); const requestMetadata = MiscService.getRequestMetadata(ctx); const routerRequest = ctx.request.body as RouterTransformationRequest; const destination = routerRequest.destType; @@ -117,6 +113,9 @@ export class DestinationController { return ctx; } const metaTags = MiscService.getMetaTags(events[0].metadata); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(events[0]?.metadata as unknown as ErrorDetailer), + }); stats.histogram('dest_transform_input_events', events.length, { destination, version: 'v0', @@ -133,6 +132,7 @@ export class DestinationController { destination, getIntegrationVersion(), requestMetadata, + loggerWithCtx, ); } catch (error: any) { const metaTO = integrationService.getTags( @@ -155,10 +155,7 @@ export class DestinationController { version: 'v0', ...metaTags, }); - logger.debug( - 'Native(Router-Transform):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Router-Transform):: Response from transformer::', ctx.body); stats.timing('dest_transform_request_latency', startTime, { destination, version: 'v0', @@ -169,15 +166,15 @@ export class DestinationController { } public static batchProcess(ctx: Context) { - logger.debug( - 'Native(Process-Transform-Batch):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.info('Native(Process-Transform-Batch):: Requst to transformer::', ctx.request.body); const startTime = new Date(); const requestMetadata = MiscService.getRequestMetadata(ctx); const routerRequest = ctx.request.body as RouterTransformationRequest; const destination = routerRequest.destType; let events = routerRequest.input; + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(events[0]?.metadata as unknown as ErrorDetailer), + }); const integrationService = ServiceSelector.getDestinationService(events); try { events = DestinationPreTransformationService.preProcess(events, ctx); @@ -187,6 +184,7 @@ export class DestinationController { destination, getIntegrationVersion(), requestMetadata, + loggerWithCtx, ); ctx.body = resplist; } catch (error: any) { @@ -204,10 +202,7 @@ export class DestinationController { ctx.body = [errResp]; } ControllerUtility.postProcess(ctx); - logger.debug( - 'Native(Process-Transform-Batch):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Process-Transform-Batch):: Response from transformer::', ctx.body); stats.timing('dest_transform_request_latency', startTime, { destination, feature: tags.FEATURES.BATCH, diff --git a/src/controllers/regulation.ts b/src/controllers/regulation.ts index 318b5ed4e7..4b8f87e3fa 100644 --- a/src/controllers/regulation.ts +++ b/src/controllers/regulation.ts @@ -1,19 +1,16 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import logger from '../logger'; -import { UserDeletionRequest, UserDeletionResponse } from '../types'; import { ServiceSelector } from '../helpers/serviceSelector'; -import tags from '../v0/util/tags'; -import stats from '../util/stats'; import { DestinationPostTransformationService } from '../services/destination/postTransformation'; +import { UserDeletionRequest, UserDeletionResponse } from '../types'; +import stats from '../util/stats'; +import tags from '../v0/util/tags'; // eslint-disable-next-line @typescript-eslint/no-unused-vars import { CatchErr } from '../util/types'; export class RegulationController { public static async deleteUsers(ctx: Context) { - logger.debug( - 'Native(Process-Transform):: Requst to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Process-Transform):: Requst to transformer::', ctx.request.body); const startTime = new Date(); let rudderDestInfo: any; try { diff --git a/src/controllers/source.ts b/src/controllers/source.ts index ef5483a756..e1a4931371 100644 --- a/src/controllers/source.ts +++ b/src/controllers/source.ts @@ -1,20 +1,18 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { MiscService } from '../services/misc'; import { ServiceSelector } from '../helpers/serviceSelector'; -import { ControllerUtility } from './util'; -import logger from '../logger'; +import { MiscService } from '../services/misc'; import { SourcePostTransformationService } from '../services/source/postTransformation'; +import { ControllerUtility } from './util'; export class SourceController { public static async sourceTransform(ctx: Context) { - logger.debug( - 'Native(Source-Transform):: Request to transformer::', - JSON.stringify(ctx.request.body), - ); + logger.debug('Native(Source-Transform):: Request to transformer::', ctx.request.body); const requestMetadata = MiscService.getRequestMetadata(ctx); const events = ctx.request.body as object[]; const { version, source }: { version: string; source: string } = ctx.params; const integrationService = ServiceSelector.getNativeSourceService(); + const loggerWithCtx = logger.child({ version, source }); try { const { implementationVersion, input } = ControllerUtility.adaptInputToVersion( source, @@ -26,6 +24,7 @@ export class SourceController { source, implementationVersion, requestMetadata, + loggerWithCtx, ); ctx.body = resplist; } catch (err: any) { @@ -34,10 +33,7 @@ export class SourceController { ctx.body = [resp]; } ControllerUtility.postProcess(ctx); - logger.debug( - 'Native(Source-Transform):: Response from transformer::', - JSON.stringify(ctx.body), - ); + loggerWithCtx.debug('Native(Source-Transform):: Response from transformer::', ctx.body); return ctx; } } diff --git a/src/controllers/userTransform.ts b/src/controllers/userTransform.ts index 3e01686a52..0e288c6f04 100644 --- a/src/controllers/userTransform.ts +++ b/src/controllers/userTransform.ts @@ -1,10 +1,10 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { Context } from 'koa'; -import { ProcessorTransformationRequest, UserTransformationServiceResponse } from '../types/index'; import { UserTransformService } from '../services/userTransform'; -import logger from '../logger'; +import { ProcessorTransformationRequest, UserTransformationServiceResponse } from '../types/index'; import { - setupUserTransformHandler, extractLibraries, + setupUserTransformHandler, validateCode, } from '../util/customTransformer'; import { ControllerUtility } from './util'; @@ -13,7 +13,7 @@ export class UserTransformController { public static async transform(ctx: Context) { logger.debug( '(User transform - router:/customTransform ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const requestSize = Number(ctx.request.get('content-length')); const events = ctx.request.body as ProcessorTransformationRequest[]; @@ -23,7 +23,7 @@ export class UserTransformController { ControllerUtility.postProcess(ctx, processedRespone.retryStatus); logger.debug( '(User transform - router:/customTransform ):: Response from transformer', - JSON.stringify(ctx.response.body), + ctx.response.body, ); return ctx; } @@ -31,7 +31,7 @@ export class UserTransformController { public static async testTransform(ctx: Context) { logger.debug( '(User transform - router:/transformation/test ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); const { events, trRevCode, libraryVersionIDs = [] } = ctx.request.body as any; const response = await UserTransformService.testTransformRoutine( @@ -43,7 +43,7 @@ export class UserTransformController { ControllerUtility.postProcess(ctx, response.status); logger.debug( '(User transform - router:/transformation/test ):: Response from transformer', - JSON.stringify(ctx.response.body), + ctx.response.body, ); return ctx; } @@ -51,7 +51,7 @@ export class UserTransformController { public static async testTransformLibrary(ctx: Context) { logger.debug( '(User transform - router:/transformationLibrary/test ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); try { const { code, language = 'javascript' } = ctx.request.body as any; @@ -66,7 +66,7 @@ export class UserTransformController { } logger.debug( '(User transform - router:/transformationLibrary/test ):: Response from transformer', - JSON.stringify(ctx.response.body), + ctx.response.body, ); return ctx; } @@ -74,7 +74,7 @@ export class UserTransformController { public static async testTransformSethandle(ctx: Context) { logger.debug( '(User transform - router:/transformation/sethandle ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); try { const { trRevCode, libraryVersionIDs = [] } = ctx.request.body as any; @@ -96,7 +96,7 @@ export class UserTransformController { } logger.debug( '(User transform - router:/transformation/sethandle ):: Response from transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); return ctx; } @@ -104,7 +104,7 @@ export class UserTransformController { public static async extractLibhandle(ctx: Context) { logger.debug( '(User transform - router:/extractLibs ):: Request to transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); try { const { @@ -134,7 +134,7 @@ export class UserTransformController { } logger.debug( '(User transform - router:/extractLibs ):: Response from transformer', - JSON.stringify(ctx.request.body), + ctx.request.body, ); return ctx; } diff --git a/src/index.ts b/src/index.ts index 36f32f1aed..5557994b2e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,14 +1,14 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; +import dotenv from 'dotenv'; +import gracefulShutdown from 'http-graceful-shutdown'; import Koa from 'koa'; import bodyParser from 'koa-bodyparser'; -import gracefulShutdown from 'http-graceful-shutdown'; -import dotenv from 'dotenv'; -import logger from './logger'; -import cluster from './util/cluster'; +import { addRequestSizeMiddleware, addStatMiddleware, initPyroscope } from './middleware'; +import { addSwaggerRoutes, applicationRoutes } from './routes'; import { metricsRouter } from './routes/metricsRouter'; -import { addStatMiddleware, addRequestSizeMiddleware, initPyroscope } from './middleware'; -import { logProcessInfo } from './util/utils'; -import { applicationRoutes, addSwaggerRoutes } from './routes'; +import cluster from './util/cluster'; import { RedisDB } from './util/redis/redisConnector'; +import { logProcessInfo } from './util/utils'; dotenv.config(); const clusterEnabled = process.env.CLUSTER_ENABLED !== 'false'; diff --git a/src/interfaces/DestinationService.ts b/src/interfaces/DestinationService.ts index 4947089b5d..5d7596dac5 100644 --- a/src/interfaces/DestinationService.ts +++ b/src/interfaces/DestinationService.ts @@ -1,14 +1,14 @@ import { DeliveryV0Response, + DeliveryV1Response, MetaTransferObject, ProcessorTransformationRequest, ProcessorTransformationResponse, + ProxyRequest, RouterTransformationRequestData, RouterTransformationResponse, UserDeletionRequest, UserDeletionResponse, - ProxyRequest, - DeliveryV1Response, } from '../types/index'; export interface DestinationService { @@ -28,6 +28,7 @@ export interface DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: NonNullable, ): Promise; doRouterTransformation( @@ -35,6 +36,7 @@ export interface DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: NonNullable, ): Promise; doBatchTransformation( @@ -42,6 +44,7 @@ export interface DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: NonNullable, ): RouterTransformationResponse[]; deliver( diff --git a/src/interfaces/SourceService.ts b/src/interfaces/SourceService.ts index c7de8cfe8b..fab6490264 100644 --- a/src/interfaces/SourceService.ts +++ b/src/interfaces/SourceService.ts @@ -1,4 +1,5 @@ import { MetaTransferObject, SourceTransformationResponse } from '../types/index'; +import { FixMe } from '../util/types'; export interface SourceService { getTags(): MetaTransferObject; @@ -8,5 +9,6 @@ export interface SourceService { sourceType: string, version: string, requestMetadata: NonNullable, + logger: FixMe, ): Promise; } diff --git a/src/services/comparator.ts b/src/services/comparator.ts index 36cb0ebd5a..511436dfd1 100644 --- a/src/services/comparator.ts +++ b/src/services/comparator.ts @@ -1,4 +1,5 @@ /* eslint-disable class-methods-use-this */ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import { DestinationService } from '../interfaces/DestinationService'; import { DeliveryV0Response, @@ -14,10 +15,9 @@ import { UserDeletionRequest, UserDeletionResponse, } from '../types'; -import tags from '../v0/util/tags'; -import stats from '../util/stats'; -import logger from '../logger'; import { CommonUtils } from '../util/common'; +import stats from '../util/stats'; +import tags from '../v0/util/tags'; const NS_PER_SEC = 1e9; @@ -204,6 +204,7 @@ export class ComparatorService implements DestinationService { destinationType, version, requestMetadata, + logger, ); const primaryTimeDiff = process.hrtime(primaryStartTime); const primaryTime = primaryTimeDiff[0] * NS_PER_SEC + primaryTimeDiff[1]; @@ -262,6 +263,7 @@ export class ComparatorService implements DestinationService { destinationType, version, requestMetadata, + logger, ); const primaryTimeDiff = process.hrtime(primaryStartTime); const primaryTime = primaryTimeDiff[0] * NS_PER_SEC + primaryTimeDiff[1]; @@ -320,6 +322,7 @@ export class ComparatorService implements DestinationService { destinationType, version, requestMetadata, + {}, ); const primaryTimeDiff = process.hrtime(primaryStartTime); const primaryTime = primaryTimeDiff[0] * NS_PER_SEC + primaryTimeDiff[1]; diff --git a/src/services/destination/__tests__/nativeIntegration.test.ts b/src/services/destination/__tests__/nativeIntegration.test.ts index 59c8b41881..85d099d292 100644 --- a/src/services/destination/__tests__/nativeIntegration.test.ts +++ b/src/services/destination/__tests__/nativeIntegration.test.ts @@ -1,11 +1,12 @@ -import { NativeIntegrationDestinationService } from '../nativeIntegration'; -import { DestinationPostTransformationService } from '../postTransformation'; +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; +import { FetchHandler } from '../../../helpers/fetchHandlers'; import { - ProcessorTransformationRequest, ProcessorTransformationOutput, + ProcessorTransformationRequest, ProcessorTransformationResponse, } from '../../../types/index'; -import { FetchHandler } from '../../../helpers/fetchHandlers'; +import { NativeIntegrationDestinationService } from '../nativeIntegration'; +import { DestinationPostTransformationService } from '../postTransformation'; afterEach(() => { jest.clearAllMocks(); @@ -47,6 +48,7 @@ describe('NativeIntegration Service', () => { destType, version, requestMetadata, + logger, ); expect(resp).toEqual(tresponse); @@ -77,6 +79,7 @@ describe('NativeIntegration Service', () => { destType, version, requestMetadata, + logger, ); const expected = [ diff --git a/src/services/destination/cdkV2Integration.ts b/src/services/destination/cdkV2Integration.ts index c18a5cd936..a649da9154 100644 --- a/src/services/destination/cdkV2Integration.ts +++ b/src/services/destination/cdkV2Integration.ts @@ -1,27 +1,28 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable class-methods-use-this */ -import groupBy from 'lodash/groupBy'; import { TransformationError } from '@rudderstack/integrations-lib'; +import groupBy from 'lodash/groupBy'; import { processCdkV2Workflow } from '../../cdk/v2/handler'; import { DestinationService } from '../../interfaces/DestinationService'; import { DeliveryV0Response, + DeliveryV1Response, ErrorDetailer, MetaTransferObject, + ProcessorTransformationOutput, ProcessorTransformationRequest, ProcessorTransformationResponse, + ProxyRequest, RouterTransformationRequestData, RouterTransformationResponse, - ProcessorTransformationOutput, UserDeletionRequest, UserDeletionResponse, - ProxyRequest, - DeliveryV1Response, } from '../../types/index'; +import stats from '../../util/stats'; +import { CatchErr, FixMe } from '../../util/types'; import tags from '../../v0/util/tags'; +import { MiscService } from '../misc'; import { DestinationPostTransformationService } from './postTransformation'; -import stats from '../../util/stats'; -import { CatchErr } from '../../util/types'; export class CDKV2DestinationService implements DestinationService { public init() {} @@ -55,10 +56,19 @@ export class CDKV2DestinationService implements DestinationService { destinationType: string, _version: string, requestMetadata: NonNullable, + logger: any, ): Promise { // TODO: Change the promise type const respList: ProcessorTransformationResponse[][] = await Promise.all( events.map(async (event) => { + const metaTo = this.getTags( + destinationType, + event.metadata.destinationId, + event.metadata.workspaceId, + tags.FEATURES.PROCESSOR, + ); + metaTo.metadata = event.metadata; + const loggerWithCtx = logger.child({ ...MiscService.getLoggableData(metaTo.errorDetails) }); try { const transformedPayloads: | ProcessorTransformationOutput @@ -66,9 +76,9 @@ export class CDKV2DestinationService implements DestinationService { destinationType, event, tags.FEATURES.PROCESSOR, + loggerWithCtx, requestMetadata, ); - stats.increment('event_transform_success', { destType: destinationType, module: tags.MODULES.DESTINATION, @@ -85,13 +95,6 @@ export class CDKV2DestinationService implements DestinationService { undefined, ); } catch (error: CatchErr) { - const metaTo = this.getTags( - destinationType, - event.metadata.destinationId, - event.metadata.workspaceId, - tags.FEATURES.PROCESSOR, - ); - metaTo.metadata = event.metadata; const erroredResp = DestinationPostTransformationService.handleProcessorTransformFailureEvents( error, @@ -112,6 +115,7 @@ export class CDKV2DestinationService implements DestinationService { destinationType: string, _version: string, requestMetadata: NonNullable, + logger: FixMe, ): Promise { const allDestEvents: object = groupBy( events, @@ -127,12 +131,16 @@ export class CDKV2DestinationService implements DestinationService { tags.FEATURES.ROUTER, ); metaTo.metadata = destInputArray[0].metadata; + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(metaTo.errorDetails), + }); try { const doRouterTransformationResponse: RouterTransformationResponse[] = await processCdkV2Workflow( destinationType, destInputArray, tags.FEATURES.ROUTER, + loggerWithCtx, requestMetadata, ); return DestinationPostTransformationService.handleRouterTransformSuccessEvents( diff --git a/src/services/destination/nativeIntegration.ts b/src/services/destination/nativeIntegration.ts index 2bb82fc602..0bc9308fcd 100644 --- a/src/services/destination/nativeIntegration.ts +++ b/src/services/destination/nativeIntegration.ts @@ -1,31 +1,32 @@ /* eslint-disable prefer-destructuring */ /* eslint-disable sonarjs/no-duplicate-string */ /* eslint-disable @typescript-eslint/no-unused-vars */ -import groupBy from 'lodash/groupBy'; import cloneDeep from 'lodash/cloneDeep'; +import groupBy from 'lodash/groupBy'; +import networkHandlerFactory from '../../adapters/networkHandlerFactory'; +import { FetchHandler } from '../../helpers/fetchHandlers'; import { DestinationService } from '../../interfaces/DestinationService'; import { + DeliveryJobState, DeliveryV0Response, + DeliveryV1Response, ErrorDetailer, MetaTransferObject, + ProcessorTransformationOutput, ProcessorTransformationRequest, ProcessorTransformationResponse, + ProxyRequest, + ProxyV0Request, + ProxyV1Request, RouterTransformationRequestData, RouterTransformationResponse, - ProcessorTransformationOutput, UserDeletionRequest, UserDeletionResponse, - ProxyRequest, - ProxyV0Request, - ProxyV1Request, - DeliveryV1Response, - DeliveryJobState, } from '../../types/index'; -import { DestinationPostTransformationService } from './postTransformation'; -import networkHandlerFactory from '../../adapters/networkHandlerFactory'; -import { FetchHandler } from '../../helpers/fetchHandlers'; -import tags from '../../v0/util/tags'; import stats from '../../util/stats'; +import tags from '../../v0/util/tags'; +import { MiscService } from '../misc'; +import { DestinationPostTransformationService } from './postTransformation'; export class NativeIntegrationDestinationService implements DestinationService { public init() {} @@ -59,27 +60,33 @@ export class NativeIntegrationDestinationService implements DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: any, ): Promise { const destHandler = FetchHandler.getDestHandler(destinationType, version); const respList: ProcessorTransformationResponse[][] = await Promise.all( events.map(async (event) => { + const metaTO = this.getTags( + destinationType, + event.metadata?.destinationId, + event.metadata?.workspaceId, + tags.FEATURES.PROCESSOR, + ); + metaTO.metadata = event.metadata; + const loggerWithCtx = logger.child({ ...MiscService.getLoggableData(metaTO.errorDetails) }); try { const transformedPayloads: | ProcessorTransformationOutput - | ProcessorTransformationOutput[] = await destHandler.process(event, requestMetadata); + | ProcessorTransformationOutput[] = await destHandler.process( + event, + requestMetadata, + loggerWithCtx, + ); return DestinationPostTransformationService.handleProcessorTransformSucessEvents( event, transformedPayloads, destHandler, ); } catch (error: any) { - const metaTO = this.getTags( - destinationType, - event.metadata?.destinationId, - event.metadata?.workspaceId, - tags.FEATURES.PROCESSOR, - ); - metaTO.metadata = event.metadata; const erroredResp = DestinationPostTransformationService.handleProcessorTransformFailureEvents( error, @@ -97,6 +104,7 @@ export class NativeIntegrationDestinationService implements DestinationService { destinationType: string, version: string, requestMetadata: NonNullable, + logger: any, ): Promise { const destHandler = FetchHandler.getDestHandler(destinationType, version); const allDestEvents: NonNullable = groupBy( @@ -112,9 +120,16 @@ export class NativeIntegrationDestinationService implements DestinationService { destInputArray[0].metadata?.workspaceId, tags.FEATURES.ROUTER, ); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(metaTO.errorDetails), + }); try { const doRouterTransformationResponse: RouterTransformationResponse[] = - await destHandler.processRouterDest(cloneDeep(destInputArray), requestMetadata); + await destHandler.processRouterDest( + cloneDeep(destInputArray), + requestMetadata, + loggerWithCtx, + ); metaTO.metadata = destInputArray[0].metadata; return DestinationPostTransformationService.handleRouterTransformSuccessEvents( doRouterTransformationResponse, @@ -141,6 +156,7 @@ export class NativeIntegrationDestinationService implements DestinationService { destinationType: string, version: any, requestMetadata: NonNullable, + logger: any, ): RouterTransformationResponse[] { const destHandler = FetchHandler.getDestHandler(destinationType, version); if (!destHandler.batch) { @@ -152,20 +168,24 @@ export class NativeIntegrationDestinationService implements DestinationService { ); const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents); const response = groupedEvents.map((destEvents) => { + const metaTO = this.getTags( + destinationType, + destEvents[0].metadata.destinationId, + destEvents[0].metadata.workspaceId, + tags.FEATURES.BATCH, + ); + metaTO.metadatas = events.map((event) => event.metadata); + const loggerWithCtx = logger.child({ + ...MiscService.getLoggableData(metaTO.errorDetails), + }); try { const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch( destEvents, requestMetadata, + loggerWithCtx, ); return destBatchedRequests; } catch (error: any) { - const metaTO = this.getTags( - destinationType, - destEvents[0].metadata.destinationId, - destEvents[0].metadata.workspaceId, - tags.FEATURES.BATCH, - ); - metaTO.metadatas = events.map((event) => event.metadata); const errResp = DestinationPostTransformationService.handleBatchTransformFailureEvents( error, metaTO, @@ -264,6 +284,7 @@ export class NativeIntegrationDestinationService implements DestinationService { error: `${destType}: Doesn't support deletion of users`, } as UserDeletionResponse; } + const metaTO = this.getTags(destType, 'unknown', 'unknown', tags.FEATURES.USER_DELETION); try { const result: UserDeletionResponse = await destUserDeletionHandler.processDeleteUsers({ ...request, @@ -276,7 +297,6 @@ export class NativeIntegrationDestinationService implements DestinationService { }); return result; } catch (error: any) { - const metaTO = this.getTags(destType, 'unknown', 'unknown', tags.FEATURES.USER_DELETION); return DestinationPostTransformationService.handleUserDeletionFailureEvents( error, metaTO, diff --git a/src/services/destination/postTransformation.ts b/src/services/destination/postTransformation.ts index 161547683b..40cee61e66 100644 --- a/src/services/destination/postTransformation.ts +++ b/src/services/destination/postTransformation.ts @@ -1,24 +1,30 @@ /* eslint-disable no-param-reassign */ +import { PlatformError } from '@rudderstack/integrations-lib'; import cloneDeep from 'lodash/cloneDeep'; -import isObject from 'lodash/isObject'; import isEmpty from 'lodash/isEmpty'; -import { PlatformError } from '@rudderstack/integrations-lib'; +import isObject from 'lodash/isObject'; import { + DeliveryJobState, + DeliveryV0Response, + DeliveryV1Response, + MetaTransferObject, + ProcessorTransformationOutput, ProcessorTransformationRequest, ProcessorTransformationResponse, RouterTransformationResponse, - ProcessorTransformationOutput, - DeliveryV0Response, - MetaTransferObject, UserDeletionResponse, - DeliveryV1Response, - DeliveryJobState, } from '../../types/index'; -import { generateErrorObject } from '../../v0/util'; -import { ErrorReportingService } from '../errorReporting'; -import tags from '../../v0/util/tags'; import stats from '../../util/stats'; import { FixMe } from '../../util/types'; +import { generateErrorObject } from '../../v0/util'; +import tags from '../../v0/util/tags'; +import { ErrorReportingService } from '../errorReporting'; +import { MiscService } from '../misc'; + +const defaultErrorMessages = { + router: '[Router Transform] Error occurred while processing the payload.', + delivery: '[Delivery] Error occured while processing payload', +} as const; export class DestinationPostTransformationService { public static handleProcessorTransformSucessEvents( @@ -62,6 +68,10 @@ export class DestinationPostTransformationService { error: errObj.message || '[Processor Transform] Error occurred while processing the payload.', statTags: errObj.statTags, } as ProcessorTransformationResponse; + MiscService.logError( + errObj.message || '[Processor Transform] Error occurred while processing the payload.', + metaTo.errorDetails, + ); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } @@ -99,6 +109,7 @@ export class DestinationPostTransformationService { ...resp.statTags, ...metaTo.errorDetails, }; + MiscService.logError(resp.error || defaultErrorMessages.router, metaTo.errorDetails); stats.increment('event_transform_failure', metaTo.errorDetails); } else { stats.increment('event_transform_success', { @@ -124,9 +135,10 @@ export class DestinationPostTransformationService { metadata: metaTo.metadatas, batched: false, statusCode: errObj.status, - error: errObj.message || '[Router Transform] Error occurred while processing the payload.', + error: errObj.message || defaultErrorMessages.router, statTags: errObj.statTags, } as RouterTransformationResponse; + MiscService.logError(errObj.message || defaultErrorMessages.router, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); stats.increment('event_transform_failure', metaTo.errorDetails); return resp; @@ -141,9 +153,10 @@ export class DestinationPostTransformationService { metadata: metaTo.metadatas, batched: false, statusCode: 500, // for batch we should consider code error hence keeping retryable - error: errObj.message || '[Batch Transform] Error occurred while processing payload.', + error: errObj.message || defaultErrorMessages.delivery, statTags: errObj.statTags, } as RouterTransformationResponse; + MiscService.logError(error as string, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } @@ -174,6 +187,10 @@ export class DestinationPostTransformationService { const errObj = generateErrorObject(error, metaTo.errorDetails, false); const metadataArray = metaTo.metadatas; if (!Array.isArray(metadataArray)) { + MiscService.logError( + 'Proxy v1 endpoint error : metadataArray is not an array', + metaTo.errorDetails, + ); // Panic throw new PlatformError('Proxy v1 endpoint error : metadataArray is not an array'); } @@ -182,7 +199,7 @@ export class DestinationPostTransformationService { error: JSON.stringify(error.destinationResponse?.response) || errObj.message || - '[Delivery] Error occured while processing payload', + defaultErrorMessages.delivery, statusCode: errObj.status, metadata, } as DeliveryJobState; @@ -198,7 +215,7 @@ export class DestinationPostTransformationService { authErrorCategory: errObj.authErrorCategory, }), } as DeliveryV1Response; - + MiscService.logError(errObj.message, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } @@ -216,6 +233,7 @@ export class DestinationPostTransformationService { authErrorCategory: errObj.authErrorCategory, }), } as UserDeletionResponse; + MiscService.logError(errObj.message, metaTo.errorDetails); ErrorReportingService.reportError(error, metaTo.errorContext, resp); return resp; } diff --git a/src/services/misc.ts b/src/services/misc.ts index e0953d08bf..3df1196c1d 100644 --- a/src/services/misc.ts +++ b/src/services/misc.ts @@ -1,10 +1,11 @@ /* eslint-disable global-require, import/no-dynamic-require */ +import { LoggableExtraData, structuredLogger as logger } from '@rudderstack/integrations-lib'; import fs from 'fs'; -import path from 'path'; import { Context } from 'koa'; +import path from 'path'; import { DestHandlerMap } from '../constants/destinationCanonicalNames'; -import { Metadata } from '../types'; import { getCPUProfile, getHeapProfile } from '../middleware'; +import { ErrorDetailer, Metadata } from '../types'; export class MiscService { public static getDestHandler(dest: string, version: string) { @@ -74,4 +75,21 @@ export class MiscService { public static async getHeapProfile() { return getHeapProfile(); } + + public static getLoggableData(errorDetailer: ErrorDetailer): Partial { + return { + ...(errorDetailer?.destinationId && { destinationId: errorDetailer.destinationId }), + ...(errorDetailer?.sourceId && { sourceId: errorDetailer.sourceId }), + ...(errorDetailer?.workspaceId && { workspaceId: errorDetailer.workspaceId }), + ...(errorDetailer?.destType && { destType: errorDetailer.destType }), + ...(errorDetailer?.module && { module: errorDetailer.module }), + ...(errorDetailer?.implementation && { implementation: errorDetailer.implementation }), + ...(errorDetailer?.feature && { feature: errorDetailer.feature }), + }; + } + + public static logError(message: string, errorDetailer: ErrorDetailer) { + const loggableExtraData: Partial = this.getLoggableData(errorDetailer); + logger.errorw(message || '', loggableExtraData); + } } diff --git a/src/services/source/__tests__/nativeIntegration.test.ts b/src/services/source/__tests__/nativeIntegration.test.ts index bb40438811..77e355fd1a 100644 --- a/src/services/source/__tests__/nativeIntegration.test.ts +++ b/src/services/source/__tests__/nativeIntegration.test.ts @@ -1,8 +1,9 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; +import { FetchHandler } from '../../../helpers/fetchHandlers'; +import { RudderMessage, SourceTransformationResponse } from '../../../types/index'; +import stats from '../../../util/stats'; import { NativeIntegrationSourceService } from '../nativeIntegration'; import { SourcePostTransformationService } from '../postTransformation'; -import { SourceTransformationResponse, RudderMessage } from '../../../types/index'; -import stats from '../../../util/stats'; -import { FetchHandler } from '../../../helpers/fetchHandlers'; afterEach(() => { jest.clearAllMocks(); @@ -43,7 +44,13 @@ describe('NativeIntegration Source Service', () => { }); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const resp = await service.sourceTransformRoutine( + events, + sourceType, + version, + requestMetadata, + logger, + ); expect(resp).toEqual(tresponse); @@ -80,7 +87,13 @@ describe('NativeIntegration Source Service', () => { jest.spyOn(stats, 'increment').mockImplementation(() => {}); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const resp = await service.sourceTransformRoutine( + events, + sourceType, + version, + requestMetadata, + logger, + ); expect(resp).toEqual(tresponse); diff --git a/src/services/source/nativeIntegration.ts b/src/services/source/nativeIntegration.ts index 6eaef2f835..2ecfc30066 100644 --- a/src/services/source/nativeIntegration.ts +++ b/src/services/source/nativeIntegration.ts @@ -1,3 +1,4 @@ +import { FetchHandler } from '../../helpers/fetchHandlers'; import { SourceService } from '../../interfaces/SourceService'; import { ErrorDetailer, @@ -5,11 +6,11 @@ import { RudderMessage, SourceTransformationResponse, } from '../../types/index'; +import stats from '../../util/stats'; import { FixMe } from '../../util/types'; -import { SourcePostTransformationService } from './postTransformation'; -import { FetchHandler } from '../../helpers/fetchHandlers'; import tags from '../../v0/util/tags'; -import stats from '../../util/stats'; +import { MiscService } from '../misc'; +import { SourcePostTransformationService } from './postTransformation'; export class NativeIntegrationSourceService implements SourceService { public getTags(): MetaTransferObject { @@ -31,20 +32,23 @@ export class NativeIntegrationSourceService implements SourceService { version: string, // eslint-disable-next-line @typescript-eslint/no-unused-vars _requestMetadata: NonNullable, + logger: FixMe, ): Promise { const sourceHandler = FetchHandler.getSourceHandler(sourceType, version); + const metaTO = this.getTags(); + const loggerWithCtx = logger.child({ ...MiscService.getLoggableData(metaTO.errorDetails) }); const respList: SourceTransformationResponse[] = await Promise.all( sourceEvents.map(async (sourceEvent) => { try { const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse = - await sourceHandler.process(sourceEvent); + await sourceHandler.process(sourceEvent, loggerWithCtx); return SourcePostTransformationService.handleSuccessEventsSource(respEvents); } catch (error: FixMe) { - const metaTO = this.getTags(); stats.increment('source_transform_errors', { source: sourceType, version, }); + logger.debug('Error during source Transform', error); return SourcePostTransformationService.handleFailureEventsSource(error, metaTO); } }), diff --git a/src/util/redis/redisConnector.test.js b/src/util/redis/redisConnector.test.js index e0491132ff..7cf2ccbbcf 100644 --- a/src/util/redis/redisConnector.test.js +++ b/src/util/redis/redisConnector.test.js @@ -2,6 +2,7 @@ const fs = require('fs'); const path = require('path'); const version = 'v0'; const { RedisDB } = require('./redisConnector'); +const { structuredLogger: logger } = require('@rudderstack/integrations-lib'); jest.mock('ioredis', () => require('../../../test/__mocks__/redis')); const sourcesList = ['shopify']; process.env.USE_REDIS_DB = 'true'; @@ -54,7 +55,7 @@ describe(`Source Tests`, () => { data.forEach((dataPoint, index) => { it(`${index}. ${source} - ${dataPoint.description}`, async () => { try { - const output = await transformer.process(dataPoint.input); + const output = await transformer.process(dataPoint.input, logger); expect(output).toEqual(dataPoint.output); } catch (error) { expect(error.message).toEqual(dataPoint.output.error); diff --git a/src/v0/destinations/campaign_manager/transform.js b/src/v0/destinations/campaign_manager/transform.js index 14bc6d2c19..403a79a971 100644 --- a/src/v0/destinations/campaign_manager/transform.js +++ b/src/v0/destinations/campaign_manager/transform.js @@ -243,7 +243,8 @@ const batchEvents = (eventChunksArray) => { return batchedResponseList; }; -const processRouterDest = async (inputs, reqMetadata) => { +const processRouterDest = async (inputs, reqMetadata, logger) => { + logger.debug(`Transformation router request received with size ${inputs.length}`); const batchErrorRespList = []; const eventChunksArray = []; const { destination } = inputs[0]; diff --git a/src/v0/destinations/mailchimp/utils.js b/src/v0/destinations/mailchimp/utils.js index 1f4fc03ee5..a726f23a39 100644 --- a/src/v0/destinations/mailchimp/utils.js +++ b/src/v0/destinations/mailchimp/utils.js @@ -1,9 +1,12 @@ const get = require('get-value'); const md5 = require('md5'); -const { InstrumentationError, NetworkError } = require('@rudderstack/integrations-lib'); +const { + InstrumentationError, + NetworkError, + structuredLogger: logger, +} = require('@rudderstack/integrations-lib'); const myAxios = require('../../../util/myAxios'); const { MappedToDestinationKey } = require('../../../constants'); -const logger = require('../../../logger'); const { isDefinedAndNotNull, isDefined, diff --git a/src/v0/destinations/twitter_ads/transform.js b/src/v0/destinations/twitter_ads/transform.js index 268dca3636..365663925e 100644 --- a/src/v0/destinations/twitter_ads/transform.js +++ b/src/v0/destinations/twitter_ads/transform.js @@ -156,7 +156,8 @@ function validateRequest(message) { } } -function process(event) { +function process(event, requestMetadata, logger) { + logger.info(`[TWITTER ADS]: Transforming request received with info`); const { message, metadata, destination } = event; validateRequest(message); diff --git a/src/v0/sources/canny/transform.js b/src/v0/sources/canny/transform.js index 9188f5ac34..38ed5e137e 100644 --- a/src/v0/sources/canny/transform.js +++ b/src/v0/sources/canny/transform.js @@ -2,7 +2,6 @@ const sha256 = require('sha256'); const { TransformationError } = require('@rudderstack/integrations-lib'); const Message = require('../message'); const { voterMapping, authorMapping, checkForRequiredFields } = require('./util'); -const { logger } = require('../../../logger'); const CannyOperation = { VOTE_CREATED: 'vote.created', @@ -15,7 +14,7 @@ const CannyOperation = { * @param {*} event * @param {*} typeOfUser */ -function settingIds(message, event, typeOfUser) { +function settingIds(message, event, typeOfUser, logger) { const clonedMessage = { ...message }; try { // setting up userId @@ -48,7 +47,7 @@ function settingIds(message, event, typeOfUser) { * @param {*} typeOfUser * @returns message */ -function createMessage(event, typeOfUser) { +function createMessage(event, typeOfUser, logger) { const message = new Message(`Canny`); message.setEventType('track'); @@ -61,7 +60,7 @@ function createMessage(event, typeOfUser) { message.context.integration.version = '1.0.0'; - const finalMessage = settingIds(message, event, typeOfUser); + const finalMessage = settingIds(message, event, typeOfUser, logger); checkForRequiredFields(finalMessage); @@ -73,7 +72,7 @@ function createMessage(event, typeOfUser) { return finalMessage; } -function process(event) { +function process(event, logger) { let typeOfUser; switch (event.type) { @@ -86,6 +85,6 @@ function process(event) { typeOfUser = 'author'; } - return createMessage(event, typeOfUser); + return createMessage(event, typeOfUser, logger); } module.exports = { process }; diff --git a/src/v0/sources/shopify/transform.js b/src/v0/sources/shopify/transform.js index b6d2a4c6cf..4886fb3df1 100644 --- a/src/v0/sources/shopify/transform.js +++ b/src/v0/sources/shopify/transform.js @@ -16,7 +16,6 @@ const { const { RedisDB } = require('../../../util/redis/redisConnector'); const { removeUndefinedAndNullValues, isDefinedAndNotNull } = require('../../util'); const Message = require('../message'); -const logger = require('../../../logger'); const { EventType } = require('../../../constants'); const { INTEGERATION, @@ -206,7 +205,7 @@ const processEvent = async (inputEvent, metricMetadata) => { }; const isIdentifierEvent = (event) => ['rudderIdentifier', 'rudderSessionIdentifier'].includes(event?.event); -const processIdentifierEvent = async (event, metricMetadata) => { +const processIdentifierEvent = async (event, metricMetadata, logger) => { if (useRedisDatabase) { let value; let field; @@ -256,13 +255,13 @@ const processIdentifierEvent = async (event, metricMetadata) => { } return NO_OPERATION_SUCCESS; }; -const process = async (event) => { +const process = async (event, logger) => { const metricMetadata = { writeKey: event.query_parameters?.writeKey?.[0], source: 'SHOPIFY', }; if (isIdentifierEvent(event)) { - return processIdentifierEvent(event, metricMetadata); + return processIdentifierEvent(event, metricMetadata, logger); } const response = await processEvent(event, metricMetadata); return response; diff --git a/src/v0/sources/shopify/util.js b/src/v0/sources/shopify/util.js index c4bbb61b9c..3dc54cc434 100644 --- a/src/v0/sources/shopify/util.js +++ b/src/v0/sources/shopify/util.js @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/naming-convention */ const { v5 } = require('uuid'); const sha256 = require('sha256'); -const { TransformationError } = require('@rudderstack/integrations-lib'); +const { TransformationError, structuredLogger: logger } = require('@rudderstack/integrations-lib'); const stats = require('../../../util/stats'); const { constructPayload, @@ -12,7 +12,6 @@ const { isDefinedAndNotNull, } = require('../../util'); const { RedisDB } = require('../../../util/redis/redisConnector'); -const logger = require('../../../logger'); const { lineItemsMappingJSON, productMappingJSON, diff --git a/test/__tests__/pinterestConversion-cdk.test.ts b/test/__tests__/pinterestConversion-cdk.test.ts index f4da92eea9..6aaa710ed7 100644 --- a/test/__tests__/pinterestConversion-cdk.test.ts +++ b/test/__tests__/pinterestConversion-cdk.test.ts @@ -1,6 +1,7 @@ +import { structuredLogger as logger } from '@rudderstack/integrations-lib'; import fs from 'fs'; import path from 'path'; -import { processCdkV2Workflow, getWorkflowEngine, executeWorkflow } from '../../src/cdk/v2/handler'; +import { executeWorkflow, getWorkflowEngine, processCdkV2Workflow } from '../../src/cdk/v2/handler'; import tags from '../../src/v0/util/tags'; const integration = 'pinterest_tag'; @@ -22,7 +23,12 @@ describe(`${name} Tests`, () => { it(`${name} - payload: ${index}`, async () => { const expected = expectedData[index]; try { - const output = await processCdkV2Workflow(integration, input, tags.FEATURES.PROCESSOR); + const output = await processCdkV2Workflow( + integration, + input, + tags.FEATURES.PROCESSOR, + logger, + ); expect(output).toEqual(expected); } catch (error: any) { expect(error.message).toEqual(expected.error); @@ -46,7 +52,12 @@ describe(`${name} Tests`, () => { it(`${name} - payload: ${index}`, async () => { const expected = expectedData[index]; try { - const output = await processCdkV2Workflow(integration, input, tags.FEATURES.PROCESSOR); + const output = await processCdkV2Workflow( + integration, + input, + tags.FEATURES.PROCESSOR, + logger, + ); expect(output).toEqual(expected); } catch (error: any) { expect(error.message).toEqual(expected.error); @@ -91,6 +102,7 @@ describe(`${name} Tests`, () => { integration, inputRouterErrorData, tags.FEATURES.ROUTER, + logger, ); expect(output).toEqual(expectedRouterErrorData); }); @@ -98,7 +110,12 @@ describe(`${name} Tests`, () => { describe('Default Batch size', () => { inputRouterData.forEach((input, index) => { it(`Payload: ${index}`, async () => { - const output = await processCdkV2Workflow(integration, input, tags.FEATURES.ROUTER); + const output = await processCdkV2Workflow( + integration, + input, + tags.FEATURES.ROUTER, + logger, + ); expect(output).toEqual(expectedRouterData[index]); }); }); diff --git a/test/apitests/service.api.test.ts b/test/apitests/service.api.test.ts index 266619b6ac..e46357f824 100644 --- a/test/apitests/service.api.test.ts +++ b/test/apitests/service.api.test.ts @@ -1,13 +1,13 @@ import fs from 'fs'; -import path from 'path'; -import request from 'supertest'; import { createHttpTerminator } from 'http-terminator'; import Koa from 'koa'; import bodyParser from 'koa-bodyparser'; +import path from 'path'; import setValue from 'set-value'; -import { applicationRoutes } from '../../src/routes'; -import { FetchHandler } from '../../src/helpers/fetchHandlers'; +import request from 'supertest'; import networkHandlerFactory from '../../src/adapters/networkHandlerFactory'; +import { FetchHandler } from '../../src/helpers/fetchHandlers'; +import { applicationRoutes } from '../../src/routes'; let server: any; const OLD_ENV = process.env;