Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sre 484 test out aggregating metrics in a worker thread with reset #3466

3 changes: 0 additions & 3 deletions src/legacy/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,6 @@ if (startDestTransformer) {
ctx.status = ctxStatusCode;
ctx.set('apiVersion', API_VERSION);

stats.timing('user_transform_request_latency', startTime, {
processSessions,
});
stats.timingSummary('user_transform_request_latency_summary', startTime, {
processSessions,
});
Expand Down
14 changes: 13 additions & 1 deletion src/middleware.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
const Pyroscope = require('@pyroscope/nodejs');
const { getDestTypeFromContext } = require('@rudderstack/integrations-lib');
const cluster = require('cluster');
const stats = require('./util/stats');

function initPyroscope() {
Pyroscope.init({
appName: 'rudder-transformer',
appName: 'ut-rudder-transformer',
serverAddress: 'https://sdk-pyroscope.rudderstack.com',
tags: {
workerId: cluster.worker ? cluster.worker.id : 0,

Check warning on line 11 in src/middleware.js

View check run for this annotation

Codecov / codecov/patch

src/middleware.js#L11

Added line #L11 was not covered by tests
isMaster: cluster.isMaster,
},
});
Pyroscope.start();

Check warning on line 15 in src/middleware.js

View check run for this annotation

Codecov / codecov/patch

src/middleware.js#L15

Added line #L15 was not covered by tests
Pyroscope.startHeapCollecting();
process.on('SIGINT', () => {
Pyroscope.stop();
Pyroscope.stopHeapCollecting();

Check warning on line 19 in src/middleware.js

View check run for this annotation

Codecov / codecov/patch

src/middleware.js#L17-L19

Added lines #L17 - L19 were not covered by tests
});

}

function getCPUProfile(seconds) {
Expand Down
10 changes: 10 additions & 0 deletions src/routes/metricsRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ if (enableStats) {
ctx.body = error.message;
}
});

metricsRouter.get('/resetMetrics', async (ctx) => {
try {
await stats.resetMetricsController(ctx);
} catch (error) {
logger.error(error);
ctx.status = 400;
ctx.body = error.message;
}
});
}

module.exports = { metricsRouter };
10 changes: 0 additions & 10 deletions src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@ export class UserTransformService {
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});
} finally {
stats.timing('user_transform_request_latency', userFuncStartTime, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.timing('user_transform_batch_size', requestSize, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.timingSummary('user_transform_request_latency_summary', userFuncStartTime, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
Expand Down
1 change: 0 additions & 1 deletion src/util/customTransformer-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async function userTransformHandlerV1(
...(events.length && events[0].metadata ? getTransformationMetadata(events[0].metadata) : {}),
};
stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', invokeTime, tags);
stats.timingSummary('user_transform_function_latency_summary', invokeTime, tags);
}

Expand Down
1 change: 0 additions & 1 deletion src/util/customTransformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ async function runUserTransform(
};

stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', invokeTime, tags);
stats.timingSummary('user_transform_function_latency_summary', invokeTime, tags);
}

Expand Down
3 changes: 0 additions & 3 deletions src/util/customTransforrmationsStore-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ async function getTransformationCodeV1(versionId) {

responseStatusHandler(response.status, 'Transformation', versionId, url);
stats.increment('get_transformation_code', { success: 'true', ...tags });
stats.timing('get_transformation_code_time', startTime, tags);
stats.timingSummary('get_transformation_code_time_summary', startTime, tags);
const myJson = await response.json();
transformationCache[versionId] = myJson;
Expand All @@ -56,7 +55,6 @@ async function getLibraryCodeV1(versionId) {

responseStatusHandler(response.status, 'Transformation Library', versionId, url);
stats.increment('get_libraries_code', { success: 'true', ...tags });
stats.timing('get_libraries_code_time', startTime, tags);
stats.timingSummary('get_libraries_code_time_summary', startTime, tags);
const myJson = await response.json();
libraryCache[versionId] = myJson;
Expand Down Expand Up @@ -84,7 +82,6 @@ async function getRudderLibByImportName(importName) {

responseStatusHandler(response.status, 'Rudder Library', importName, url);
stats.increment('get_libraries_code', { success: 'true', ...tags });
stats.timing('get_libraries_code_time', startTime, tags);
stats.timingSummary('get_libraries_code_time_summary', startTime, tags);
const myJson = await response.json();
rudderLibraryCache[importName] = myJson;
Expand Down
1 change: 0 additions & 1 deletion src/util/customTransforrmationsStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ async function getTransformationCode(versionId) {

responseStatusHandler(response.status, 'Transformation', versionId, url);
stats.increment('get_transformation_code', { versionId, success: 'true' });
stats.timing('get_transformation_code_time', startTime, { versionId });
stats.timingSummary('get_transformation_code_time_summary', startTime, { versionId });
const myJson = await response.json();
myCache.set(versionId, myJson);
Expand Down
74 changes: 67 additions & 7 deletions src/util/metricsAggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
const GET_METRICS_RES = 'rudder-transformer:getMetricsRes';
const AGGREGATE_METRICS_REQ = 'rudder-transformer:aggregateMetricsReq';
const AGGREGATE_METRICS_RES = 'rudder-transformer:aggregateMetricsRes';
const RESET_METRICS_REQUEST = 'rudder-transformer:resetMetricsReq';

class MetricsAggregator {
constructor(prometheusInstance) {
Expand All @@ -18,28 +19,62 @@
this.registerCallbacks();
}

timing(name, start, tags = {}) {
try {
let metric = this.prometheusInstance.prometheusRegistry.getSingleMetric(name);

Check warning on line 24 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L22-L24

Added lines #L22 - L24 were not covered by tests
if (!metric) {
logger.warn(

Check warning on line 26 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L26

Added line #L26 was not covered by tests
`Prometheus: Timing metric ${name} not found in the registry. Creating a new one`,
);
metric = this.prometheusInstance.newHistogramStat(name, name, Object.keys(tags));

Check warning on line 29 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L29

Added line #L29 was not covered by tests
}
metric.observe(tags, (new Date() - start) / 1000);

Check warning on line 31 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L31

Added line #L31 was not covered by tests
} catch (e) {
logger.error(`Prometheus: Timing metric ${name} failed with error ${e}`);

Check warning on line 33 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L33

Added line #L33 was not covered by tests
}
}

registerCallbacks() {
if (cluster.isPrimary) {
// register callback for master process
cluster.on('message', async (worker, message) => {
const startTime = Date.now();

Check warning on line 41 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L41

Added line #L41 was not covered by tests
if (message.type === GET_METRICS_RES) {
logger.debug(`[MetricsAggregator] Master received metrics from worker ${worker.id}`);
logger.info(

Check warning on line 43 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L43

Added line #L43 was not covered by tests
`${new Date().toISOString()} [MetricsAggregator] Master received metrics from worker ${worker.id}`,
);
this.timing('getMetricsReq', message.startTime, { workerId: worker.id });

Check warning on line 46 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L46

Added line #L46 was not covered by tests
await this.handleMetricsResponse(message);
}
logger.info(

Check warning on line 49 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L49

Added line #L49 was not covered by tests
`[MetricsAggregator] Master processed ${message.type} in ${new Date() - startTime} ms`,
);
});
return;
}
// register callback for worker process
cluster.worker.on('message', async (message) => {
const startTime = new Date();

Check warning on line 57 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L57

Added line #L57 was not covered by tests
if (message.type === GET_METRICS_REQ) {
logger.debug(`[MetricsAggregator] Worker ${cluster.worker.id} received metrics request`);
logger.info(

Check warning on line 59 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L59

Added line #L59 was not covered by tests
`${new Date().toISOString()} [MetricsAggregator] Worker ${cluster.worker.id} received metrics request`,
);
try {
const metrics = await this.prometheusInstance.prometheusRegistry.getMetricsAsJSON();
cluster.worker.send({ type: GET_METRICS_RES, metrics });
cluster.worker.send({ type: GET_METRICS_RES, metrics, startTime: message.startTime });

Check warning on line 64 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L64

Added line #L64 was not covered by tests
} catch (error) {
cluster.worker.send({ type: GET_METRICS_RES, error: error.message });
}
} else if (message.type === RESET_METRICS_REQUEST) {
logger.info(

Check warning on line 69 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L69

Added line #L69 was not covered by tests
`[MetricsAggregator] Worker ${cluster.worker.id} received reset metrics request `,
);
this.prometheusInstance.prometheusRegistry.resetMetrics();
logger.info(`[MetricsAggregator] Worker ${cluster.worker.id} reset metrics successfully`);

Check warning on line 73 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L72-L73

Added lines #L72 - L73 were not covered by tests
}
logger.info(

Check warning on line 75 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L75

Added line #L75 was not covered by tests
`[MetricsAggregator] Worker ${cluster.worker.id} processed ${message.type} in ${new Date() - startTime} ms`,
);
});
}

Expand All @@ -51,7 +86,12 @@
);

this.workerThread.on('message', (message) => {
const startTime = new Date();

Check warning on line 89 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L89

Added line #L89 was not covered by tests
if ((message.type = AGGREGATE_METRICS_RES)) {
logger.info(

Check warning on line 91 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L91

Added line #L91 was not covered by tests
`${new Date().toISOString()} [MetricsAggregator] Master received aggregated metrics from worker thread`,
);
this.timing('aggregateMetricsReq', message.startTime);

Check warning on line 94 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L94

Added line #L94 was not covered by tests
if (message.error) {
this.rejectFunc(new Error(message.error));
this.resetAggregator();
Expand All @@ -60,6 +100,9 @@
this.resolveFunc(message.metrics);
this.resetAggregator();
}
logger.info(

Check warning on line 103 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L103

Added line #L103 was not covered by tests
`[MetricsAggregator] Master processed ${message.type} in ${new Date() - startTime} ms`,
);
});
}
}
Expand Down Expand Up @@ -88,16 +131,22 @@
return new Promise((resolve, reject) => {
this.resolveFunc = resolve;
this.rejectFunc = reject;
const startTime = Date.now();

Check warning on line 134 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L134

Added line #L134 was not covered by tests
for (const id in cluster.workers) {
this.pendingMetricRequests++;
logger.debug(`[MetricsAggregator] Requesting metrics from worker ${id}`);
cluster.workers[id].send({ type: GET_METRICS_REQ });
logger.info(`[MetricsAggregator] Requesting metrics from worker ${id}`);
cluster.workers[id].send({ type: GET_METRICS_REQ, startTime });

Check warning on line 138 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L137-L138

Added lines #L137 - L138 were not covered by tests
}
});
}

async aggregateMetricsInWorkerThread() {
this.workerThread.postMessage({ type: AGGREGATE_METRICS_REQ, metrics: this.metricsBuffer });
this.metricsBuffer.push(await this.prometheusInstance.prometheusRegistry.getMetricsAsJSON());
this.workerThread.postMessage({

Check warning on line 145 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L144-L145

Added lines #L144 - L145 were not covered by tests
type: AGGREGATE_METRICS_REQ,
metrics: this.metricsBuffer,
startTime: Date.now(),
});
}

async handleMetricsResponse(message) {
Expand All @@ -109,7 +158,11 @@
this.metricsBuffer.push(message.metrics);
this.pendingMetricRequests--;
if (this.pendingMetricRequests === 0) {
this.aggregateMetricsInWorkerThread();
this.timing('getMetricsAll', message.startTime);
logger.info(

Check warning on line 162 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L161-L162

Added lines #L161 - L162 were not covered by tests
`${new Date().toISOString()} [MetricsAggregator] All metrics received, sending metrics to worker thread`,
);
await this.aggregateMetricsInWorkerThread();

Check warning on line 165 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L165

Added line #L165 was not covered by tests
}
}

Expand All @@ -118,6 +171,13 @@
`[MetricsAggregator] Worker thread terminated with exit code ${await this.workerThread.terminate()}`,
);
}

resetMetrics() {
for (const id in cluster.workers) {
logger.info(`[MetricsAggregator] Resetting metrics for worker ${id}`);
cluster.workers[id].send({ type: RESET_METRICS_REQUEST });

Check warning on line 178 in src/util/metricsAggregator.js

View check run for this annotation

Codecov / codecov/patch

src/util/metricsAggregator.js#L175-L178

Added lines #L175 - L178 were not covered by tests
}
}
}

module.exports = { MetricsAggregator, AGGREGATE_METRICS_REQ, AGGREGATE_METRICS_RES };
1 change: 0 additions & 1 deletion src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ const executeFaasFunction = async (
};

stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', startTime, tags);
stats.timingSummary('user_transform_function_latency_summary', startTime, tags);
}
};
Expand Down
Loading
Loading