From bdc0f41c57776e95809414b7598107004aa2aa6b Mon Sep 17 00:00:00 2001 From: Abhimanyu Babbar Date: Wed, 13 Mar 2024 14:58:30 +0530 Subject: [PATCH] chore: base support for raising stat per tracking plan event (#2847) chore: added base support for raising event per tracking plan event --- src/controllers/trackingPlan.ts | 2 +- src/services/trackingPlan.ts | 113 ++++++++++++++++---------------- src/util/prometheus.js | 30 +++++++-- src/v0/util/index.js | 6 ++ 4 files changed, 88 insertions(+), 63 deletions(-) diff --git a/src/controllers/trackingPlan.ts b/src/controllers/trackingPlan.ts index 74e47e0ec9..e4802cfc4d 100644 --- a/src/controllers/trackingPlan.ts +++ b/src/controllers/trackingPlan.ts @@ -7,7 +7,7 @@ export class TrackingPlanController { const events = ctx.request.body; const requestSize = Number(ctx.request.get('content-length')); const reqParams = ctx.request.query; - const response = await TrackingPlanservice.validateTrackingPlan(events, requestSize, reqParams); + const response = await TrackingPlanservice.validate(events, requestSize, reqParams); ctx.body = response.body; ControllerUtility.postProcess(ctx, response.status); return ctx; diff --git a/src/services/trackingPlan.ts b/src/services/trackingPlan.ts index 2e68df55e9..93b6ee11ff 100644 --- a/src/services/trackingPlan.ts +++ b/src/services/trackingPlan.ts @@ -1,88 +1,87 @@ import logger from '../logger'; import { RetryRequestError, RespStatusError, constructValidationErrors } from '../util/utils'; -import { getMetadata } from '../v0/util'; +import { getMetadata, getTrackingPlanMetadata } from '../v0/util'; import eventValidator from '../util/eventValidation'; import stats from '../util/stats'; +import { HTTP_STATUS_CODES } from '../v0/util/constant'; export class TrackingPlanservice { - public static async validateTrackingPlan(events, requestSize, reqParams) { - const requestStartTime = new Date(); + public static async validate(events, requestSize, reqParams) { + const startTime = Date.now(); const respList: any[] = []; - const metaTags = events[0].metadata ? getMetadata(events[0].metadata) : {}; + const metaTags = events.length && events[0].metadata ? getMetadata(events[0].metadata) : {}; + const tpTags: any = + events.length && events[0].metadata ? getTrackingPlanMetadata(events[0].metadata) : {}; let ctxStatusCode = 200; for (let i = 0; i < events.length; i++) { + let eventValidationResponse: any; + let exceptionOccured = false; + const eventStartTime = Date.now(); const event = events[i]; - const eventStartTime = new Date(); + try { - const parsedEvent = event; - parsedEvent.request = { query: reqParams }; - const hv = await eventValidator.handleValidation(parsedEvent); - if (hv['dropEvent']) { - respList.push({ - output: event.message, - metadata: event.metadata, - statusCode: 400, - validationErrors: hv['validationErrors'], - error: JSON.stringify(constructValidationErrors(hv['validationErrors'])), - }); - stats.counter('tp_violation_type', 1, { - violationType: hv['violationType'], - ...metaTags, - }); - } else { - respList.push({ - output: event.message, - metadata: event.metadata, - statusCode: 200, - validationErrors: hv['validationErrors'], - error: JSON.stringify(constructValidationErrors(hv['validationErrors'])), - }); - stats.counter('tp_propagated_events', 1, { - ...metaTags, - }); - } - } catch (error) { - const errMessage = `Error occurred while validating : ${error}`; - logger.error(errMessage); - let status = 200; + event.request = { query: reqParams }; + const validatedEvent = await eventValidator.handleValidation(event); + eventValidationResponse = { + output: event.message, + metadata: event.metadata, + statusCode: validatedEvent['dropEvent'] + ? HTTP_STATUS_CODES.BAD_REQUEST + : HTTP_STATUS_CODES.OK, + validationErrors: validatedEvent['validationErrors'], + error: JSON.stringify(constructValidationErrors(validatedEvent['validationErrors'])), + }; + } catch (error: any) { + logger.debug( + `Error occurred while validating event`, + 'event', + `${event.message?.event}::${event.message?.type}`, + 'trackingPlan', + `${tpTags?.trackingPlanId}`, + 'error', + error.message, + ); + + exceptionOccured = true; + // no need to process further if + // we have error of retry request error if (error instanceof RetryRequestError) { ctxStatusCode = error.statusCode; + break; } - if (error instanceof RespStatusError) { - status = error.statusCode; - } - respList.push({ + + eventValidationResponse = { output: event.message, metadata: event.metadata, - statusCode: status, + statusCode: error instanceof RespStatusError ? error.statusCode : HTTP_STATUS_CODES.OK, validationErrors: [], - error: errMessage, - }); - stats.counter('tp_errors', 1, { - ...metaTags, - workspaceId: event.metadata?.workspaceId, - trackingPlanId: event.metadata?.trackingPlanId, - }); + error: `Error occurred while validating: ${error}`, + }; } finally { - stats.timing('tp_event_latency', eventStartTime, { + // finally on every event, we need to + // capture the information related to the validates event + stats.timing('tp_event_validation_latency', eventStartTime, { ...metaTags, + ...tpTags, + status: eventValidationResponse.statusCode, + exception: exceptionOccured, }); } - } - stats.counter('tp_events_count', events.length, { - ...metaTags, - }); + respList.push(eventValidationResponse); + } - stats.histogram('tp_request_size', requestSize, { + stats.histogram('tp_batch_size', requestSize, { ...metaTags, + ...tpTags, }); - stats.timing('tp_request_latency', requestStartTime, { + // capture overall function latency + // with metadata tags + stats.histogram('tp_batch_validation_latency', (Date.now() - startTime) / 1000, { ...metaTags, - workspaceId: events[0]?.metadata?.workspaceId, - trackingPlanId: events[0]?.metadata?.trackingPlanId, + ...tpTags, }); return { body: respList, status: ctxStatusCode }; diff --git a/src/util/prometheus.js b/src/util/prometheus.js index 89e5424c0c..b502681987 100644 --- a/src/util/prometheus.js +++ b/src/util/prometheus.js @@ -590,14 +590,34 @@ class Prometheus { labelNames: ['method', 'route', 'code'], }, { - name: 'tp_request_size', - help: 'tp_request_size', + name: 'tp_batch_size', + help: 'Size of batch of events for tracking plan validation', type: 'histogram', - labelNames: ['sourceType', 'destinationType', 'k8_namespace'], + labelNames: [ + 'sourceType', + 'destinationType', + 'k8_namespace', + 'workspaceId', + 'trackingPlanId', + ], + }, + { + name: 'tp_event_validation_latency', + help: 'Latency of validating tracking plan at event level', + type: 'histogram', + labelNames: [ + 'sourceType', + 'destinationType', + 'k8_namespace', + 'workspaceId', + 'trackingPlanId', + 'status', + 'exception', + ], }, { - name: 'tp_request_latency', - help: 'tp_request_latency', + name: 'tp_batch_validation_latency', + help: 'Latency of validating tracking plan at batch level', type: 'histogram', labelNames: [ 'sourceType', diff --git a/src/v0/util/index.js b/src/v0/util/index.js index c1debce088..32872cc5d9 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -1419,6 +1419,11 @@ function getStringValueOfJSON(json) { return output; } +const getTrackingPlanMetadata = (metadata) => ({ + trackingPlanId: metadata.trackingPlanId, + workspaceId: metadata.workspaceId, +}); + const getMetadata = (metadata) => ({ sourceType: metadata.sourceType, destinationType: metadata.destinationType, @@ -2267,6 +2272,7 @@ module.exports = { getMappingConfig, getMetadata, getTransformationMetadata, + getTrackingPlanMetadata, getParsedIP, getStringValueOfJSON, getSuccessRespEvents,