Skip to content

Commit

Permalink
feat: logging initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
aashishmalik committed Feb 13, 2024
1 parent d522b35 commit 1198ae1
Show file tree
Hide file tree
Showing 16 changed files with 129 additions and 45 deletions.
11 changes: 10 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
"ua-parser-js": "^1.0.37",
"unset-value": "^2.0.1",
"uuid": "^9.0.0",
"valid-url": "^1.0.9"
"valid-url": "^1.0.9",
"winston": "^3.11.0"
},
"devDependencies": {
"@commitlint/config-conventional": "^17.6.3",
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/bulkUpload.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable global-require, import/no-dynamic-require, @typescript-eslint/no-unused-vars */
import { client as errNotificationClient } from '../util/errorNotifier';
import logger from '../logger';
import logger from '@rudderstack/integrations-lib';
import { CatchErr, ContextBodySimple } from '../util/types';
// TODO: To be refactored and redisgned

Expand Down
2 changes: 1 addition & 1 deletion src/controllers/delivery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import { ServiceSelector } from '../helpers/serviceSelector';
import { DeliveryTestService } from '../services/delivertTest/deliveryTest';
import { ControllerUtility } from './util';
import logger from '../logger';
import logger from '@rudderstack/integrations-lib';
import { DestinationPostTransformationService } from '../services/destination/postTransformation';
import tags from '../v0/util/tags';
import { FixMe } from '../util/types';
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import { ServiceSelector } from '../helpers/serviceSelector';
import { ControllerUtility } from './util';
import stats from '../util/stats';
import logger from '../logger';
import logger from '@rudderstack/integrations-lib';
import { getIntegrationVersion } from '../util/utils';
import tags from '../v0/util/tags';
import { DynamicConfigParser } from '../util/dynamicConfigParser';
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/regulation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Context } from 'koa';
import logger from '../logger';
import logger from '@rudderstack/integrations-lib';
import { UserDeletionRequest, UserDeletionResponse } from '../types';
import { ServiceSelector } from '../helpers/serviceSelector';
import tags from '../v0/util/tags';
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Context } from 'koa';
import { MiscService } from '../services/misc';
import { ServiceSelector } from '../helpers/serviceSelector';
import { ControllerUtility } from './util';
import logger from '../logger';
import logger from '@rudderstack/integrations-lib';
import { SourcePostTransformationService } from '../services/source/postTransformation';

export class SourceController {
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Koa from 'koa';
import bodyParser from 'koa-bodyparser';
import gracefulShutdown from 'http-graceful-shutdown';
import dotenv from 'dotenv';
import logger from './logger';
import logger from '@rudderstack/integrations-lib';
import cluster from './util/cluster';
import { metricsRouter } from './routes/metricsRouter';
import { addStatMiddleware, addRequestSizeMiddleware, initPyroscope } from './middleware';
Expand Down
27 changes: 18 additions & 9 deletions src/services/destination/cdkV2Integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import tags from '../../v0/util/tags';
import { DestinationPostTransformationService } from './postTransformation';
import stats from '../../util/stats';
import { CatchErr } from '../../util/types';
import { MiscService } from '../misc';

export class CDKV2DestinationService implements DestinationService {
public init() {}
Expand Down Expand Up @@ -59,14 +60,25 @@ export class CDKV2DestinationService implements DestinationService {
// TODO: Change the promise type
const respList: ProcessorTransformationResponse[][] = await Promise.all(
events.map(async (event) => {
const metaTo = this.getTags(
destinationType,
event.metadata.destinationId,
event.metadata.workspaceId,
tags.FEATURES.PROCESSOR,
);
metaTo.metadata = event.metadata;
const metadataWithSvcCtx = {
...requestMetadata,
serviceContext: MiscService.getLoggableData(metaTo.errorDetails),
};
try {
const transformedPayloads:
| ProcessorTransformationOutput
| ProcessorTransformationOutput[] = await processCdkV2Workflow(
destinationType,
event,
tags.FEATURES.PROCESSOR,
requestMetadata,
metadataWithSvcCtx,
);

stats.increment('event_transform_success', {
Expand All @@ -85,13 +97,6 @@ export class CDKV2DestinationService implements DestinationService {
undefined,
);
} catch (error: CatchErr) {
const metaTo = this.getTags(
destinationType,
event.metadata.destinationId,
event.metadata.workspaceId,
tags.FEATURES.PROCESSOR,
);
metaTo.metadata = event.metadata;
const erroredResp =
DestinationPostTransformationService.handleProcessorTransformFailureEvents(
error,
Expand Down Expand Up @@ -127,13 +132,17 @@ export class CDKV2DestinationService implements DestinationService {
tags.FEATURES.ROUTER,
);
metaTo.metadata = destInputArray[0].metadata;
const metadataWithSvcCtx = {
...requestMetadata,
serviceContext: MiscService.getLoggableData(metaTo.errorDetails),
};
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await processCdkV2Workflow(
destinationType,
destInputArray,
tags.FEATURES.ROUTER,
requestMetadata,
metadataWithSvcCtx,
);
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
Expand Down
53 changes: 35 additions & 18 deletions src/services/destination/nativeIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import networkHandlerFactory from '../../adapters/networkHandlerFactory';
import { FetchHandler } from '../../helpers/fetchHandlers';
import tags from '../../v0/util/tags';
import stats from '../../util/stats';
import { MiscService } from '../misc';

export class NativeIntegrationDestinationService implements DestinationService {
public init() {}
Expand Down Expand Up @@ -63,23 +64,30 @@ export class NativeIntegrationDestinationService implements DestinationService {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
const respList: ProcessorTransformationResponse[][] = await Promise.all(
events.map(async (event) => {
const metaTO = this.getTags(
destinationType,
event.metadata?.destinationId,
event.metadata?.workspaceId,
tags.FEATURES.PROCESSOR,
);
metaTO.metadata = event.metadata;
const metadataWithSvcCtx = {
...requestMetadata,
serviceContext: MiscService.getLoggableData(metaTO.errorDetails),
};
try {
const transformedPayloads:
| ProcessorTransformationOutput
| ProcessorTransformationOutput[] = await destHandler.process(event, requestMetadata);
| ProcessorTransformationOutput[] = await destHandler.process(
event,
metadataWithSvcCtx,
);
return DestinationPostTransformationService.handleProcessorTransformSucessEvents(
event,
transformedPayloads,
destHandler,
);
} catch (error: any) {
const metaTO = this.getTags(
destinationType,
event.metadata?.destinationId,
event.metadata?.workspaceId,
tags.FEATURES.PROCESSOR,
);
metaTO.metadata = event.metadata;
const erroredResp =
DestinationPostTransformationService.handleProcessorTransformFailureEvents(
error,
Expand Down Expand Up @@ -113,8 +121,12 @@ export class NativeIntegrationDestinationService implements DestinationService {
tags.FEATURES.ROUTER,
);
try {
const metadataWithSvcCtx = {
...requestMetadata,
serviceContext: MiscService.getLoggableData(metaTO.errorDetails),
};
const doRouterTransformationResponse: RouterTransformationResponse[] =
await destHandler.processRouterDest(cloneDeep(destInputArray), requestMetadata);
await destHandler.processRouterDest(cloneDeep(destInputArray), metadataWithSvcCtx);
metaTO.metadata = destInputArray[0].metadata;
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
Expand Down Expand Up @@ -152,20 +164,24 @@ export class NativeIntegrationDestinationService implements DestinationService {
);
const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents);
const response = groupedEvents.map((destEvents) => {
const metaTO = this.getTags(
destinationType,
destEvents[0].metadata.destinationId,
destEvents[0].metadata.workspaceId,
tags.FEATURES.BATCH,
);
metaTO.metadatas = events.map((event) => event.metadata);
const metadataWithSvcCtx = {
...requestMetadata,
serviceContext: MiscService.getLoggableData(metaTO.errorDetails),
};
try {
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(
destEvents,
requestMetadata,
metadataWithSvcCtx,
);
return destBatchedRequests;
} catch (error: any) {
const metaTO = this.getTags(
destinationType,
destEvents[0].metadata.destinationId,
destEvents[0].metadata.workspaceId,
tags.FEATURES.BATCH,
);
metaTO.metadatas = events.map((event) => event.metadata);
const errResp = DestinationPostTransformationService.handleBatchTransformFailureEvents(
error,
metaTO,
Expand Down Expand Up @@ -262,10 +278,12 @@ export class NativeIntegrationDestinationService implements DestinationService {
error: `${destType}: Doesn't support deletion of users`,
} as UserDeletionResponse;
}
const metaTO = this.getTags(destType, 'unknown', 'unknown', tags.FEATURES.USER_DELETION);
try {
const result: UserDeletionResponse = await destUserDeletionHandler.processDeleteUsers({
...request,
rudderDestInfo,
serviceContext: MiscService.getLoggableData(metaTO.errorDetails),
});
stats.timing('regulation_worker_requests_dest_latency', startTime, {
feature: tags.FEATURES.USER_DELETION,
Expand All @@ -274,7 +292,6 @@ export class NativeIntegrationDestinationService implements DestinationService {
});
return result;
} catch (error: any) {
const metaTO = this.getTags(destType, 'unknown', 'unknown', tags.FEATURES.USER_DELETION);
return DestinationPostTransformationService.handleUserDeletionFailureEvents(
error,
metaTO,
Expand Down
33 changes: 27 additions & 6 deletions src/services/destination/postTransformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import { ErrorReportingService } from '../errorReporting';
import tags from '../../v0/util/tags';
import stats from '../../util/stats';
import { FixMe } from '../../util/types';
import { MiscService } from '../misc';

const defaultErrorMessages = {
router: '[Router Transform] Error occurred while processing the payload.',
delivery: '[Delivery] Error occured while processing payload',
} as const;

export class DestinationPostTransformationService {
public static handleProcessorTransformSucessEvents(
Expand Down Expand Up @@ -62,6 +68,10 @@ export class DestinationPostTransformationService {
error: errObj.message || '[Processor Transform] Error occurred while processing the payload.',
statTags: errObj.statTags,
} as ProcessorTransformationResponse;
MiscService.logError(
errObj.message || '[Processor Transform] Error occurred while processing the payload.',
metaTo.errorDetails,
);
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand All @@ -87,12 +97,13 @@ export class DestinationPostTransformationService {
});
}

resultantPayloads.forEach((resp) => {
resultantPayloads.forEach((resp: RouterTransformationResponse) => {
if ('error' in resp && isObject(resp.statTags) && !isEmpty(resp.statTags)) {
resp.statTags = {
...resp.statTags,
...metaTo.errorDetails,
};
MiscService.logError(resp.error || defaultErrorMessages.router, metaTo.errorDetails);
stats.increment('event_transform_failure', metaTo.errorDetails);
} else {
stats.increment('event_transform_success', {
Expand All @@ -118,9 +129,10 @@ export class DestinationPostTransformationService {
metadata: metaTo.metadatas,
batched: false,
statusCode: errObj.status,
error: errObj.message || '[Router Transform] Error occurred while processing the payload.',
error: errObj.message || defaultErrorMessages.router,
statTags: errObj.statTags,
} as RouterTransformationResponse;
MiscService.logError(errObj.message || defaultErrorMessages.router, metaTo.errorDetails);
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
stats.increment('event_transform_failure', metaTo.errorDetails);
return resp;
Expand All @@ -138,6 +150,10 @@ export class DestinationPostTransformationService {
error: errObj.message || '[Batch Transform] Error occurred while processing payload.',
statTags: errObj.statTags,
} as RouterTransformationResponse;
MiscService.logError(
errObj.message || '[Batch Transform] Error occurred while processing payload.',
metaTo.errorDetails,
);
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand All @@ -149,14 +165,14 @@ export class DestinationPostTransformationService {
const errObj = generateErrorObject(error, metaTo.errorDetails, false);
const resp = {
status: errObj.status,
message: errObj.message || '[Delivery] Error occured while processing payload',
message: errObj.message || defaultErrorMessages.delivery,
destinationResponse: errObj.destinationResponse,
statTags: errObj.statTags,
...(errObj.authErrorCategory && {
authErrorCategory: errObj.authErrorCategory,
}),
} as DeliveryResponse;

MiscService.logError(errObj.message || defaultErrorMessages.delivery, metaTo.errorDetails);
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand All @@ -168,6 +184,10 @@ export class DestinationPostTransformationService {
const errObj = generateErrorObject(error, metaTo.errorDetails, false);
const metadataArray = metaTo.metadatas;
if (!Array.isArray(metadataArray)) {
MiscService.logError(
'Proxy v1 endpoint error : metadataArray is not an array',
metaTo.errorDetails,
);
// Panic
throw new PlatformError('Proxy v1 endpoint error : metadataArray is not an array');
}
Expand All @@ -176,7 +196,7 @@ export class DestinationPostTransformationService {
error:
JSON.stringify(error.destinationResponse?.response) ||
errObj.message ||
'[Delivery] Error occured while processing payload',
defaultErrorMessages.delivery,
statusCode: errObj.status,
metadata,
} as DeliveryJobState;
Expand All @@ -190,7 +210,7 @@ export class DestinationPostTransformationService {
message: errObj.message.toString(),
status: errObj.status,
} as DeliveriesResponse;

MiscService.logError(errObj.message, metaTo.errorDetails);
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand All @@ -208,6 +228,7 @@ export class DestinationPostTransformationService {
authErrorCategory: errObj.authErrorCategory,
}),
} as UserDeletionResponse;
MiscService.logError(errObj.message, metaTo.errorDetails);
ErrorReportingService.reportError(error, metaTo.errorContext, resp);
return resp;
}
Expand Down
Loading

0 comments on commit 1198ae1

Please sign in to comment.