Skip to content

Commit

Permalink
feat: aggregate metrics in a worker thread (#3458)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhawal1248 authored Jul 1, 2024
2 parents bb9369e + 92fabac commit 6be9d3e
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 14 deletions.
10 changes: 10 additions & 0 deletions src/routes/metricsRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ if (enableStats) {
ctx.body = error.message;
}
});

metricsRouter.get('/resetMetrics', async (ctx) => {
try {
await stats.resetMetricsController(ctx);
} catch (error) {
logger.error(error);
ctx.status = 400;
ctx.body = error.message;
}
});
}

module.exports = { metricsRouter };
29 changes: 17 additions & 12 deletions src/util/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const gracefulShutdown = require('http-graceful-shutdown');
const logger = require('../logger');
const { logProcessInfo } = require('./utils');
const { RedisDB } = require('./redis/redisConnector');
const { shutdownMetricsClient } = require('./stats');

const numWorkers = parseInt(process.env.NUM_PROCS || '1', 10);
const metricsPort = parseInt(process.env.METRICS_PORT || '9091', 10);
Expand All @@ -18,11 +19,12 @@ function finalFunction() {

// This function works only in master.
// It sends SIGTERM to all the workers
function shutdownWorkers() {
async function shutdownWorkers() {
Object.values(cluster.workers).forEach((worker) => {
process.kill(worker.process.pid);
logger.error(`Sent kill signal to worker ${worker.id} (pid: ${worker.process.pid})`);
});
await shutdownMetricsClient();
}

function start(port, app, metricsApp) {
Expand Down Expand Up @@ -51,35 +53,35 @@ function start(port, app, metricsApp) {
});

let isShuttingDown = false;
cluster.on('exit', (worker) => {
cluster.on('exit', async (worker) => {
if (!isShuttingDown) {
logger.error(`Worker (pid: ${worker.process.pid}) died`);
logger.error(`Killing other workers to avoid any side effects of the dead worker`);
logProcessInfo();
isShuttingDown = true;
shutdownWorkers();
await shutdownWorkers();
}
});

process.on('SIGTERM', () => {
process.on('SIGTERM', async () => {
logger.error('SIGTERM signal received. Closing workers...');
logProcessInfo();
isShuttingDown = true;
shutdownWorkers();
await shutdownWorkers();
});

process.on('SIGINT', () => {
process.on('SIGINT', async () => {
logger.error('SIGINT signal received. Closing workers...');
logProcessInfo();
isShuttingDown = true;
shutdownWorkers();
await shutdownWorkers();
});

process.on('SIGSEGV', () => {
process.on('SIGSEGV', async () => {
logger.error('SIGSEGV - JavaScript memory error occurred. Closing workers...');
logProcessInfo();
isShuttingDown = true;
shutdownWorkers();
await shutdownWorkers();
});
} else {
const server = app.listen(port);
Expand All @@ -90,16 +92,19 @@ function start(port, app, metricsApp) {
finally: finalFunction,
});

process.on('SIGTERM', () => {
process.on('SIGTERM', async () => {
logger.error(`SIGTERM signal received in the worker`);
await shutdownMetricsClient();
});

process.on('SIGINT', () => {
process.on('SIGINT', async () => {
logger.error(`SIGINT signal received in the worker`);
await shutdownMetricsClient();
});

process.on('SIGSEGV', () => {
process.on('SIGSEGV', async () => {
logger.error(`SIGSEGV - JavaScript memory error occurred in the worker`);
await shutdownMetricsClient();
});

logger.info(`Worker (pid: ${process.pid}) has started`);
Expand Down
181 changes: 181 additions & 0 deletions src/util/metricsAggregator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/* eslint-disable */
const cluster = require('cluster');
const logger = require('../logger');
const { Worker, isMainThread } = require('worker_threads');

const MESSAGE_TYPES = {
GET_METRICS_REQ: 'rudder-transformer:getMetricsReq',
GET_METRICS_RES: 'rudder-transformer:getMetricsRes',
AGGREGATE_METRICS_REQ: 'rudder-transformer:aggregateMetricsReq',
AGGREGATE_METRICS_RES: 'rudder-transformer:aggregateMetricsRes',
RESET_METRICS_REQ: 'rudder-transformer:resetMetricsReq',
};

const config = {
isPeriodicResetEnabled: process.env.METRICS_AGGREGATOR_PERIODIC_RESET_ENABLED === 'true',
periodicResetInterval: process.env.METRICS_AGGREGATOR_PERIODIC_RESET_INTERVAL_SECONDS
? parseInt(process.env.METRICS_AGGREGATOR_PERIODIC_RESET_INTERVAL_SECONDS, 10)

Check warning on line 17 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L17

Added line #L17 was not covered by tests
: 30 * 60,
};

class MetricsAggregator {
constructor(prometheusInstance) {
this.metricsBuffer = [];
this.pendingMetricRequests = 0;
this.resolveFunc = null;
this.rejectFunc = null;
this.prometheusInstance = prometheusInstance;
this.createWorkerThread();
this.registerCallbacks();
}

// onWorkerMessage is called when the master receives a message from a worker
async onWorkerMessage(worker, message) {
if (message.type === MESSAGE_TYPES.GET_METRICS_RES) {
logger.debug(`[MetricsAggregator] Master received metrics from worker ${worker.id}`);
await this.handleMetricsResponse(message);
}
}

// onMasterMessage is called when a worker receives a message from the master
async onMasterMessage(message) {
if (message.type === MESSAGE_TYPES.GET_METRICS_REQ) {
logger.debug(`[MetricsAggregator] Worker ${cluster.worker.id} received metrics request`);
try {
const metrics = await this.prometheusInstance.prometheusRegistry.getMetricsAsJSON();
cluster.worker.send({ type: MESSAGE_TYPES.GET_METRICS_RES, metrics });
} catch (error) {
cluster.worker.send({ type: MESSAGE_TYPES.GET_METRICS_RES, error: error.message });

Check warning on line 48 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L48

Added line #L48 was not covered by tests
}
} else if (message.type === MESSAGE_TYPES.RESET_METRICS_REQ) {
logger.info(`[MetricsAggregator] Worker ${cluster.worker.id} received reset metrics request`);
this.prometheusInstance.prometheusRegistry.resetMetrics();
logger.info(`[MetricsAggregator] Worker ${cluster.worker.id} reset metrics successfully`);

Check warning on line 53 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L51-L53

Added lines #L51 - L53 were not covered by tests
}
}

registerCallbacks() {
if (cluster.isPrimary) {
// register callback for master process
cluster.on('message', this.onWorkerMessage.bind(this));
if (config.isPeriodicResetEnabled) {
// register callback to reset metrics if enabled
this.registerCallbackForPeriodicReset(config.periodicResetInterval);

Check warning on line 63 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L63

Added line #L63 was not covered by tests
}
return;
}
// register callback for worker process
cluster.worker.on('message', this.onMasterMessage.bind(this));

Check warning on line 68 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L68

Added line #L68 was not covered by tests
}

registerCallbackForPeriodicReset(intervalSeconds) {
// store the timer in the aggregator for future operations like shutdown
this.periodicResetTimer = setInterval(() => {
logger.info(
`[MetricsAggregator] Periodic reset interval of ${intervalSeconds} seconds expired, reseting metrics`,
);
this.resetMetrics();
}, intervalSeconds * 1000);
}

createWorkerThread() {
if (cluster.isPrimary && isMainThread) {
this.workerThread = new Worker('./src/util/worker.js');
logger.info(
`[MetricsAggregator] Worker thread created with threadId ${this.workerThread.threadId}`,
);

this.workerThread.on('message', (message) => {
if (message.type === MESSAGE_TYPES.AGGREGATE_METRICS_RES) {
if (message.error) {
this.rejectFunc(new Error(message.error));
this.resetAggregator();
return;

Check warning on line 93 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L91-L93

Added lines #L91 - L93 were not covered by tests
}
this.resolveFunc(message.metrics);
this.resetAggregator();
}
});
}
}

resetAggregator() {
this.metricsBuffer = [];
this.pendingMetricRequests = 0;
this.resolveFunc = null;
this.rejectFunc = null;
}

async aggregateMetrics() {
// If a request is already being processed, reject the new request
// Use resolveFunc to check if a request is already being processed
// we dont support concurrent /metrics requests for now - we would need to implement a requestId mechanism and then handle all message calls accoridng to this requestId
// this is how it is implemented in prom-client [https://github.com/siimon/prom-client/blob/564e46724e258704df52ab329a7be833aaed4b69/lib/cluster.js#L43]
// we are not implementing it for now to keep things simple, once we validate the solution we can implement it
if (this.resolveFunc !== null) {
logger.error(

Check warning on line 116 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L116

Added line #L116 was not covered by tests
'[MetricsAggregator] Failed to serve /metrics request, a request is already being processed.',
);
throw new Error(

Check warning on line 119 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L119

Added line #L119 was not covered by tests
'[MetricsAggregator] Currently processing a request, please try again later.',
);
}
return new Promise((resolve, reject) => {
this.resolveFunc = resolve;
this.rejectFunc = reject;
for (const id in cluster.workers) {
this.pendingMetricRequests++;
logger.debug(`[MetricsAggregator] Requesting metrics from worker ${id}`);
cluster.workers[id].send({ type: MESSAGE_TYPES.GET_METRICS_REQ });
}
});
}

async aggregateMetricsInWorkerThread() {
this.workerThread.postMessage({
type: MESSAGE_TYPES.AGGREGATE_METRICS_REQ,
metrics: this.metricsBuffer,
});
}

async handleMetricsResponse(message) {
if (message.error) {
this.rejectFunc(new Error(message.error));
this.resetAggregator();
return;

Check warning on line 145 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L143-L145

Added lines #L143 - L145 were not covered by tests
}
this.metricsBuffer.push(message.metrics);
this.pendingMetricRequests--;
if (this.pendingMetricRequests === 0) {
this.aggregateMetricsInWorkerThread();
}
}

async terminateWorkerThread() {
logger.info(
`[MetricsAggregator] Worker thread terminated with exit code ${await this.workerThread.terminate()}`,
);
}

resetMetrics() {
for (const id in cluster.workers) {
logger.info(`[MetricsAggregator] Resetting metrics for worker ${id}`);
cluster.workers[id].send({ type: MESSAGE_TYPES.RESET_METRICS_REQ });

Check warning on line 163 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L162-L163

Added lines #L162 - L163 were not covered by tests
}
}

async shutdown() {
// terminate worker thread if the current process is the master
if (cluster.isPrimary) {
await this.terminateWorkerThread();
}
if (this.periodicResetTimer) {
clearInterval(this.periodicResetTimer);
}
}
}

module.exports = {
MetricsAggregator,
MESSAGE_TYPES,
};
29 changes: 27 additions & 2 deletions src/util/prometheus.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const prometheusClient = require('prom-client');
const logger = require('../logger');
const { MetricsAggregator } = require('./metricsAggregator');

const clusterEnabled = process.env.CLUSTER_ENABLED !== 'false';
const useMetricsAggregator = process.env.USE_METRICS_AGGREGATOR === 'true';
const instanceID = process.env.INSTANCE_ID || 'localhost';
const prefix = 'transformer';
const defaultLabels = { instanceName: instanceID };
Expand All @@ -12,6 +14,9 @@ function appendPrefix(name) {

class Prometheus {
constructor(enableSummaryMetrics = true) {
if (clusterEnabled && useMetricsAggregator) {
this.metricsAggregator = new MetricsAggregator(this);

Check warning on line 18 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L18

Added line #L18 was not covered by tests
}
this.prometheusRegistry = new prometheusClient.Registry();
this.prometheusRegistry.setDefaultLabels(defaultLabels);
prometheusClient.collectDefaultMetrics({
Expand All @@ -27,15 +32,29 @@ class Prometheus {
async metricsController(ctx) {
ctx.status = 200;
if (clusterEnabled) {
ctx.type = this.aggregatorRegistry.contentType;
ctx.body = await this.aggregatorRegistry.clusterMetrics();
if (useMetricsAggregator) {
ctx.type = this.prometheusRegistry.contentType;
ctx.body = await this.metricsAggregator.aggregateMetrics();
} else {
ctx.type = this.aggregatorRegistry.contentType;
ctx.body = await this.aggregatorRegistry.clusterMetrics();

Check warning on line 40 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L36-L40

Added lines #L36 - L40 were not covered by tests
}
} else {
ctx.type = this.prometheusRegistry.contentType;
ctx.body = await this.prometheusRegistry.metrics();
}
return ctx.body;
}

async resetMetricsController(ctx) {
ctx.status = 200;

Check warning on line 50 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L49-L50

Added lines #L49 - L50 were not covered by tests
if (clusterEnabled && useMetricsAggregator) {
this.metricsAggregator.resetMetrics();

Check warning on line 52 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L52

Added line #L52 was not covered by tests
}
ctx.body = 'Metrics reset';
return ctx.body;

Check warning on line 55 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L54-L55

Added lines #L54 - L55 were not covered by tests
}

newCounterStat(name, help, labelNames) {
const counter = new prometheusClient.Counter({
name,
Expand Down Expand Up @@ -192,6 +211,12 @@ class Prometheus {
}
}

async shutdown() {

Check warning on line 214 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L214

Added line #L214 was not covered by tests
if (this.metricsAggregator) {
await this.metricsAggregator.shutdown();

Check warning on line 216 in src/util/prometheus.js

View check run for this annotation

Codecov / codecov/patch

src/util/prometheus.js#L216

Added line #L216 was not covered by tests
}
}

createMetrics(enableSummaryMetrics) {
const metrics = [
// Counters
Expand Down
28 changes: 28 additions & 0 deletions src/util/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,32 @@ async function metricsController(ctx) {
ctx.body = `Not supported`;
}

async function resetMetricsController(ctx) {

Check warning on line 108 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L108

Added line #L108 was not covered by tests
if (!enableStats || !statsClient) {
ctx.status = 501;
ctx.body = `Not supported`;
return;

Check warning on line 112 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L110-L112

Added lines #L110 - L112 were not covered by tests
}

if (statsClientType === 'prometheus') {
await statsClient.resetMetricsController(ctx);
return;

Check warning on line 117 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L116-L117

Added lines #L116 - L117 were not covered by tests
}

ctx.status = 501;
ctx.body = `Not supported`;

Check warning on line 121 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L120-L121

Added lines #L120 - L121 were not covered by tests
}

async function shutdownMetricsClient() {

Check warning on line 124 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L124

Added line #L124 was not covered by tests
if (!enableStats || !statsClient) {
return;

Check warning on line 126 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L126

Added line #L126 was not covered by tests
}

if (statsClientType === 'prometheus') {
await statsClient.shutdown();

Check warning on line 130 in src/util/stats.js

View check run for this annotation

Codecov / codecov/patch

src/util/stats.js#L130

Added line #L130 was not covered by tests
}
}

init();

module.exports = {
Expand All @@ -117,4 +143,6 @@ module.exports = {
gauge,
histogram,
metricsController,
resetMetricsController,
shutdownMetricsClient,
};
Loading

0 comments on commit 6be9d3e

Please sign in to comment.