Skip to content

Commit

Permalink
fix(sdk-metrics): collect metrics when periodic exporting metric read…
Browse files Browse the repository at this point in the history
…er flushes (#3517)

Co-authored-by: Daniel Dyla <[email protected]>
  • Loading branch information
legendecas and dyladan authored Jan 14, 2023
1 parent 49c75b8 commit c87a304
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/
* `telemetry.sdk.version`
* fix(sdk-trace): make spans resilient to clock drift [#3434](https://github.com/open-telemetry/opentelemetry-js/pull/3434) @dyladan
* fix(selenium-tests): updated webpack version for selenium test issue [#3456](https://github.com/open-telemetry/opentelemetry-js/issues/3456) @SaumyaBhushan
* fix(sdk-metrics): collect metrics when periodic exporting metric reader flushes [#3517](https://github.com/open-telemetry/opentelemetry-js/pull/3517) @legendecas
* fix(sdk-metrics): fix duplicated registration of metrics for collectors [#3488](https://github.com/open-telemetry/opentelemetry-js/pull/3488) @legendecas
* fix(core): fix precision loss in numberToHrtime [#3480](https://github.com/open-telemetry/opentelemetry-js/pull/3480) @legendecas

Expand Down
38 changes: 23 additions & 15 deletions packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,25 @@ export class PeriodicExportingMetricReader extends MetricReader {
}

private async _runOnce(): Promise<void> {
const { resourceMetrics, errors } = await this.collect({});
try {
await callWithTimeout(this._doRun(), this._exportTimeout);
} catch (err) {
if (err instanceof TimeoutError) {
api.diag.error(
'Export took longer than %s milliseconds and timed out.',
this._exportTimeout
);
return;
}

globalErrorHandler(err);
}
}

private async _doRun(): Promise<void> {
const { resourceMetrics, errors } = await this.collect({
timeoutMillis: this._exportTimeout,
});

if (errors.length > 0) {
api.diag.error(
Expand All @@ -109,25 +127,15 @@ export class PeriodicExportingMetricReader extends MetricReader {

protected override onInitialized(): void {
// start running the interval as soon as this reader is initialized and keep handle for shutdown.
this._interval = setInterval(async () => {
try {
await callWithTimeout(this._runOnce(), this._exportTimeout);
} catch (err) {
if (err instanceof TimeoutError) {
api.diag.error(
'Export took longer than %s milliseconds and timed out.',
this._exportTimeout
);
return;
}

globalErrorHandler(err);
}
this._interval = setInterval(() => {
// this._runOnce never rejects. Using void operator to suppress @typescript-eslint/no-floating-promises.
void this._runOnce();
}, this._exportInterval);
unrefTimer(this._interval);
}

protected async onForceFlush(): Promise<void> {
await this._runOnce();
await this._exporter.forceFlush();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const MAX_32_BIT_INT = 2 ** 31 - 1;
class TestMetricExporter implements PushMetricExporter {
public exportTime = 0;
public forceFlushTime = 0;
public throwException = false;
public failureResult = false;
public throwExport = false;
public throwFlush = false;
public rejectExport = false;
private _batches: ResourceMetrics[] = [];
private _shutdown: boolean = false;

Expand All @@ -49,11 +50,11 @@ class TestMetricExporter implements PushMetricExporter {
): void {
this._batches.push(metrics);

if (this.throwException) {
if (this.throwExport) {
throw new Error('Error during export');
}
setTimeout(() => {
if (this.failureResult) {
if (this.rejectExport) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error('some error'),
Expand All @@ -72,7 +73,7 @@ class TestMetricExporter implements PushMetricExporter {
}

async forceFlush(): Promise<void> {
if (this.throwException) {
if (this.throwFlush) {
throw new Error('Error during forceFlush');
}

Expand All @@ -91,6 +92,10 @@ class TestMetricExporter implements PushMetricExporter {
}
return this._batches.slice(0, numberOfExports);
}

getExports(): ResourceMetrics[] {
return this._batches.slice(0);
}
}

class TestDeltaMetricExporter extends TestMetricExporter {
Expand Down Expand Up @@ -203,7 +208,7 @@ describe('PeriodicExportingMetricReader', () => {
describe('periodic export', () => {
it('should keep running on export errors', async () => {
const exporter = new TestMetricExporter();
exporter.throwException = true;
exporter.throwExport = true;
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: 30,
Expand All @@ -218,13 +223,13 @@ describe('PeriodicExportingMetricReader', () => {
emptyResourceMetrics,
]);

exporter.throwException = false;
exporter.throwExport = false;
await reader.shutdown();
});

it('should keep running on export failure', async () => {
const exporter = new TestMetricExporter();
exporter.failureResult = true;
exporter.rejectExport = true;
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: 30,
Expand All @@ -239,7 +244,7 @@ describe('PeriodicExportingMetricReader', () => {
emptyResourceMetrics,
]);

exporter.failureResult = false;
exporter.rejectExport = false;
await reader.shutdown();
});

Expand All @@ -261,7 +266,7 @@ describe('PeriodicExportingMetricReader', () => {
emptyResourceMetrics,
]);

exporter.throwException = false;
exporter.throwExport = false;
await reader.shutdown();
});
});
Expand All @@ -271,7 +276,7 @@ describe('PeriodicExportingMetricReader', () => {
sinon.restore();
});

it('should forceFlush exporter', async () => {
it('should collect and forceFlush exporter', async () => {
const exporter = new TestMetricExporter();
const exporterMock = sinon.mock(exporter);
exporterMock.expects('forceFlush').calledOnceWithExactly();
Expand All @@ -284,6 +289,10 @@ describe('PeriodicExportingMetricReader', () => {
reader.setMetricProducer(new TestMetricProducer());
await reader.forceFlush();
exporterMock.verify();

const exports = exporter.getExports();
assert.strictEqual(exports.length, 1);

await reader.shutdown();
});

Expand All @@ -307,12 +316,13 @@ describe('PeriodicExportingMetricReader', () => {

it('should throw when exporter throws', async () => {
const exporter = new TestMetricExporter();
exporter.throwException = true;
exporter.throwFlush = true;
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});
reader.setMetricProducer(new TestMetricProducer());

await assertRejects(() => reader.forceFlush(), /Error during forceFlush/);
});
Expand Down Expand Up @@ -454,7 +464,7 @@ describe('PeriodicExportingMetricReader', () => {

it('should throw on non-initialized instance.', async () => {
const exporter = new TestMetricExporter();
exporter.throwException = true;
exporter.throwFlush = true;
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
Expand Down

0 comments on commit c87a304

Please sign in to comment.