From 912256184c07e9f510f4166e41d8b6e131e9446a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Fri, 6 Oct 2023 10:27:47 -0400 Subject: [PATCH 1/4] fix: BatchExporter should export continuously when batch size is reached (#3958) * fix: BathExporter should export continuously when batch size is reached * fix: add tests * lintfix * add changelog * add test for concurrency * Update CHANGELOG.md * Apply suggestions from code review * Lint and fix browser tests * fix: lint --------- Co-authored-by: Daniel Dyla Co-authored-by: Marc Pichler --- CHANGELOG.md | 1 + .../src/export/BatchSpanProcessorBase.ts | 16 +++- .../export/BatchSpanProcessorBase.test.ts | 85 +++++++++++++------ 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a330b09e7..edf81233a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ ### :bug: (Bug Fix) +* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord * fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155) * fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc * Instruments that were created, but did not have measurements will not be exported anymore diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 7d84e0c734..fef5c80dc0 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase private readonly _scheduledDelayMillis: number; private readonly _exportTimeoutMillis: number; + private _isExporting = false; private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; @@ -216,19 +217,28 @@ export abstract class BatchSpanProcessorBase } private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { + if (this._isExporting) return; + const flush = () => { + this._isExporting = true; this._flushOneBatch() .then(() => { + this._isExporting = false; if (this._finishedSpans.length > 0) { this._clearTimer(); this._maybeStartTimer(); } }) .catch(e => { + this._isExporting = false; globalErrorHandler(e); }); - }, this._scheduledDelayMillis); + }; + // we only wait if the queue doesn't have enough elements yet + if (this._finishedSpans.length >= this._maxExportBatchSize) { + return flush(); + } + if (this._timer !== undefined) return; + this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc59..83fb3ebe44 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -16,6 +16,7 @@ import { diag, ROOT_CONTEXT } from '@opentelemetry/api'; import { + ExportResult, ExportResultCode, loggingErrorHandler, setGlobalErrorHandler, @@ -27,7 +28,9 @@ import { BasicTracerProvider, BufferConfig, InMemorySpanExporter, + ReadableSpan, Span, + SpanExporter, } from '../../../src'; import { context } from '@opentelemetry/api'; import { TestRecordOnlySampler } from './TestRecordOnlySampler'; @@ -175,43 +178,35 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size reached', done => { - const clock = sinon.useFakeTimers(); + it('should export the sampled spans with buffer size reached', async () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } - const span = createSampledSpan(`${name}_6`); processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); - - setTimeout(async () => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); + assert.strictEqual(exporter.getFinishedSpans().length, 5); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); }); it('should force flush when timeout exceeded', done => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } setTimeout(() => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); done(); }, defaultBufferConfig.scheduledDelayMillis + 1000); @@ -222,14 +217,14 @@ describe('BatchSpanProcessorBase', () => { it('should force flush on demand', () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.forceFlush(); - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); }); it('should not export empty span lists', done => { @@ -466,17 +461,10 @@ describe('BatchSpanProcessorBase', () => { const debugStub = sinon.spy(diag, 'debug'); const warnStub = sinon.spy(diag, 'warn'); const span = createSampledSpan('test'); - for (let i = 0, j = 6; i < j; i++) { + for (let i = 0, j = 12; i < j; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } - assert.equal(processor['_finishedSpans'].length, 6); - assert.equal(processor['_droppedSpansCount'], 0); - sinon.assert.notCalled(debugStub); - - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - assert.equal(processor['_finishedSpans'].length, 6); assert.equal(processor['_droppedSpansCount'], 1); sinon.assert.calledOnce(debugStub); @@ -517,4 +505,45 @@ describe('BatchSpanProcessorBase', () => { }); }); }); + + describe('Concurrency', () => { + it('should only send a single batch at a time', async () => { + const callbacks: ((result: ExportResult) => void)[] = []; + const spans: ReadableSpan[] = []; + const exporter: SpanExporter = { + export: async ( + exportedSpans: ReadableSpan[], + resultCallback: (result: ExportResult) => void + ) => { + callbacks.push(resultCallback); + spans.push(...exportedSpans); + }, + shutdown: async () => {}, + }; + const processor = new BatchSpanProcessor(exporter, { + maxExportBatchSize: 5, + maxQueueSize: 6, + }); + const totalSpans = 50; + for (let i = 0; i < totalSpans; i++) { + const span = createSampledSpan(`${name}_${i}`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + } + assert.equal(callbacks.length, 1); + assert.equal(spans.length, 5); + callbacks[0]({ code: ExportResultCode.SUCCESS }); + await new Promise(resolve => setTimeout(resolve, 0)); + // After the first batch completes we will have dropped a number + // of spans and the next batch will be smaller + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + callbacks[1]({ code: ExportResultCode.SUCCESS }); + + // We expect that all the other spans have been dropped + await new Promise(resolve => setTimeout(resolve, 0)); + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + }); + }); }); From 84861cd82722d507906a64016ef59b35bf7770ed Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Fri, 6 Oct 2023 16:30:35 +0200 Subject: [PATCH 2/4] chore(deps): update dependency @types/jquery to v3.5.21 (#4187) Co-authored-by: Daniel Dyla --- packages/opentelemetry-sdk-trace-web/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opentelemetry-sdk-trace-web/package.json b/packages/opentelemetry-sdk-trace-web/package.json index 2e13b13586..afa8e31c5b 100644 --- a/packages/opentelemetry-sdk-trace-web/package.json +++ b/packages/opentelemetry-sdk-trace-web/package.json @@ -60,7 +60,7 @@ "@opentelemetry/context-zone": "1.17.0", "@opentelemetry/propagator-b3": "1.17.0", "@opentelemetry/resources": "1.17.0", - "@types/jquery": "3.5.20", + "@types/jquery": "3.5.21", "@types/mocha": "10.0.2", "@types/node": "18.6.5", "@types/sinon": "10.0.18", From 5fd3737aa3c4f27fd68bb06bfb435d8badae63f0 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Sat, 7 Oct 2023 11:40:54 +0200 Subject: [PATCH 3/4] chore: remove outdated and empty docs (#4181) --- doc/library-author.md | 3 - doc/metrics.md | 2 +- doc/processor-api.md | 147 ------------------------------------------ doc/tracing.md | 2 +- 4 files changed, 2 insertions(+), 152 deletions(-) delete mode 100644 doc/library-author.md delete mode 100644 doc/processor-api.md diff --git a/doc/library-author.md b/doc/library-author.md deleted file mode 100644 index 44e5f1e97e..0000000000 --- a/doc/library-author.md +++ /dev/null @@ -1,3 +0,0 @@ -# OpenTelemetry for Library Authors - -TODO diff --git a/doc/metrics.md b/doc/metrics.md index 314fcbaf39..3f48775e49 100644 --- a/doc/metrics.md +++ b/doc/metrics.md @@ -1,6 +1,6 @@ # Metrics -This quick start is for end users of OpenTelemetry who wish to manually measure their applications. If you are a library author, please see the [Library Authors Guide](library-author.md). If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. +This quick start is for end users of OpenTelemetry who wish to manually measure their applications. If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. For a high-level overview of OpenTelemetry metrics in general and definitions of some common terms, you can refer to the [OpenTelemetry Specification Overview][spec-overview] diff --git a/doc/processor-api.md b/doc/processor-api.md deleted file mode 100644 index 58d7916e57..0000000000 --- a/doc/processor-api.md +++ /dev/null @@ -1,147 +0,0 @@ -# Processor API Guide - - - -The processor has two responsibilities: choosing which aggregator to choose for a metric instrument and store the last record for each metric ready to be exported. - -## Selecting a specific aggregator for metrics - -Sometimes you may want to use a specific aggregator for one of your metric, export an average of the last X values instead of just the last one. - -Here is what an aggregator that does that would look like: - -```ts -import { Aggregator } from '@opentelemetry/sdk-metrics'; -import { hrTime } from '@opentelemetry/core'; - -export class AverageAggregator implements Aggregator { - - private _values: number[] = []; - private _limit: number; - - constructor (limit?: number) { - this._limit = limit ?? 10; - } - - update (value: number) { - this._values.push(value); - if (this._values.length >= this._limit) { - this._values = this._values.slice(0, 10); - } - } - - toPoint() { - const sum =this._values.reduce((agg, value) => { - agg += value; - return agg; - }, 0); - return { - value: this._values.length > 0 ? sum / this._values.length : 0, - timestamp: hrTime(), - } - } -} -``` - -Now we will need to implement our own processor to configure the sdk to use our new aggregator. To simplify even more, we will just extend the `UngroupedProcessor` (which is the default) to avoid re-implementing the whole `Aggregator` interface. - -Here the result: - -```ts -import { - UngroupedProcessor, - MetricDescriptor, - CounterSumAggregator, - ObserverAggregator, - MeasureExactAggregator, -} from '@opentelemetry/sdk-metrics'; - -export class CustomProcessor extends UngroupedProcessor { - aggregatorFor (metricDescriptor: MetricDescriptor) { - if (metricDescriptor.name === 'requests') { - return new AverageAggregator(10); - } - // this is exactly what the "UngroupedProcessor" does, we will re-use it - // to fallback on the default behavior - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - return new CounterSumAggregator(); - case MetricKind.OBSERVER: - return new ObserverAggregator(); - default: - return new MeasureExactAggregator(); - } - } -} -``` - -Finally, we need to specify to the `MeterProvider` to use our `CustomProcessor` when creating new meter: - -```ts -import { - UngroupedProcessor, - MetricDescriptor, - CounterSumAggregator, - ObserverAggregator, - MeasureExactAggregator, - MeterProvider, - Aggregator, -} from '@opentelemetry/sdk-metrics'; -import { hrTime } from '@opentelemetry/core'; - -export class AverageAggregator implements Aggregator { - - private _values: number[] = []; - private _limit: number; - - constructor (limit?: number) { - this._limit = limit ?? 10; - } - - update (value: number) { - this._values.push(value); - if (this._values.length >= this._limit) { - this._values = this._values.slice(0, 10); - } - } - - toPoint() { - const sum =this._values.reduce((agg, value) => { - agg += value; - return agg; - }, 0); - return { - value: this._values.length > 0 ? sum / this._values.length : 0, - timestamp: hrTime(), - } - } -} - -export class CustomProcessor extends UngroupedProcessor { - aggregatorFor (metricDescriptor: MetricDescriptor) { - if (metricDescriptor.name === 'requests') { - return new AverageAggregator(10); - } - // this is exactly what the "UngroupedProcessor" does, we will re-use it - // to fallback on the default behavior - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - return new CounterSumAggregator(); - case MetricKind.OBSERVER: - return new ObserverAggregator(); - default: - return new MeasureExactAggregator(); - } - } -} - -const meter = new MeterProvider({ - processor: new CustomProcessor(), - interval: 1000, -}).getMeter('example-custom-processor'); - -const requestsLatency = meter.createHistogram('requests', { - monotonic: true, - description: 'Average latency' -}); -``` diff --git a/doc/tracing.md b/doc/tracing.md index 77787549bc..392ca95736 100644 --- a/doc/tracing.md +++ b/doc/tracing.md @@ -1,6 +1,6 @@ # Tracing -This quick start is for end users of OpenTelemetry who wish to manually trace their applications. If you are a library author, please see the [Library Authors Guide](library-author.md). If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. +This quick start is for end users of OpenTelemetry who wish to manually trace their applications. If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. For a high-level overview of OpenTelemetry tracing in general and definitions of some common terms, you can refer to the [OpenTelemetry Specification Overview][spec-overview] From c320c981c5b8cd9c42d65183c2c2c5b737a0b2a1 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Sat, 7 Oct 2023 05:53:41 -0400 Subject: [PATCH 4/4] deps: update proto-loader (#4192) --- experimental/packages/exporter-logs-otlp-grpc/package.json | 2 +- experimental/packages/exporter-trace-otlp-grpc/package.json | 2 +- .../opentelemetry-exporter-metrics-otlp-grpc/package.json | 2 +- .../packages/opentelemetry-instrumentation-grpc/package.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/experimental/packages/exporter-logs-otlp-grpc/package.json b/experimental/packages/exporter-logs-otlp-grpc/package.json index bdbe715614..c7e8978fdf 100644 --- a/experimental/packages/exporter-logs-otlp-grpc/package.json +++ b/experimental/packages/exporter-logs-otlp-grpc/package.json @@ -49,7 +49,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/api-logs": "0.43.0", "@opentelemetry/otlp-exporter-base": "0.43.0", diff --git a/experimental/packages/exporter-trace-otlp-grpc/package.json b/experimental/packages/exporter-trace-otlp-grpc/package.json index 5c517b67c9..207880858b 100644 --- a/experimental/packages/exporter-trace-otlp-grpc/package.json +++ b/experimental/packages/exporter-trace-otlp-grpc/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/otlp-exporter-base": "0.43.0", "@types/mocha": "10.0.2", diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json index 270b77580c..9a158ffe39 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@types/mocha": "10.0.2", "@types/node": "18.6.5", diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/package.json b/experimental/packages/opentelemetry-instrumentation-grpc/package.json index a341ceabd7..b2248c30a1 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/package.json +++ b/experimental/packages/opentelemetry-instrumentation-grpc/package.json @@ -48,7 +48,7 @@ "devDependencies": { "@bufbuild/buf": "1.21.0-1", "@grpc/grpc-js": "^1.7.1", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/context-async-hooks": "1.17.0", "@opentelemetry/core": "1.17.0",