diff --git a/CHANGELOG.md b/CHANGELOG.md index da23b92268..b783d89012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ ### :bug: (Bug Fix) +* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord * fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155) * fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory [#4163](https://github.com/open-telemetry/opentelemetry-js/pull/4163) @pichlermarc * fixes a memory leak which occurred when two or more `MetricReader` instances are registered to a `MeterProvider` diff --git a/doc/library-author.md b/doc/library-author.md deleted file mode 100644 index 44e5f1e97e..0000000000 --- a/doc/library-author.md +++ /dev/null @@ -1,3 +0,0 @@ -# OpenTelemetry for Library Authors - -TODO diff --git a/doc/metrics.md b/doc/metrics.md index 314fcbaf39..3f48775e49 100644 --- a/doc/metrics.md +++ b/doc/metrics.md @@ -1,6 +1,6 @@ # Metrics -This quick start is for end users of OpenTelemetry who wish to manually measure their applications. If you are a library author, please see the [Library Authors Guide](library-author.md). If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. +This quick start is for end users of OpenTelemetry who wish to manually measure their applications. If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. For a high-level overview of OpenTelemetry metrics in general and definitions of some common terms, you can refer to the [OpenTelemetry Specification Overview][spec-overview] diff --git a/doc/processor-api.md b/doc/processor-api.md deleted file mode 100644 index 58d7916e57..0000000000 --- a/doc/processor-api.md +++ /dev/null @@ -1,147 +0,0 @@ -# Processor API Guide - - - -The processor has two responsibilities: choosing which aggregator to choose for a metric instrument and store the last record for each metric ready to be exported. - -## Selecting a specific aggregator for metrics - -Sometimes you may want to use a specific aggregator for one of your metric, export an average of the last X values instead of just the last one. - -Here is what an aggregator that does that would look like: - -```ts -import { Aggregator } from '@opentelemetry/sdk-metrics'; -import { hrTime } from '@opentelemetry/core'; - -export class AverageAggregator implements Aggregator { - - private _values: number[] = []; - private _limit: number; - - constructor (limit?: number) { - this._limit = limit ?? 10; - } - - update (value: number) { - this._values.push(value); - if (this._values.length >= this._limit) { - this._values = this._values.slice(0, 10); - } - } - - toPoint() { - const sum =this._values.reduce((agg, value) => { - agg += value; - return agg; - }, 0); - return { - value: this._values.length > 0 ? sum / this._values.length : 0, - timestamp: hrTime(), - } - } -} -``` - -Now we will need to implement our own processor to configure the sdk to use our new aggregator. To simplify even more, we will just extend the `UngroupedProcessor` (which is the default) to avoid re-implementing the whole `Aggregator` interface. - -Here the result: - -```ts -import { - UngroupedProcessor, - MetricDescriptor, - CounterSumAggregator, - ObserverAggregator, - MeasureExactAggregator, -} from '@opentelemetry/sdk-metrics'; - -export class CustomProcessor extends UngroupedProcessor { - aggregatorFor (metricDescriptor: MetricDescriptor) { - if (metricDescriptor.name === 'requests') { - return new AverageAggregator(10); - } - // this is exactly what the "UngroupedProcessor" does, we will re-use it - // to fallback on the default behavior - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - return new CounterSumAggregator(); - case MetricKind.OBSERVER: - return new ObserverAggregator(); - default: - return new MeasureExactAggregator(); - } - } -} -``` - -Finally, we need to specify to the `MeterProvider` to use our `CustomProcessor` when creating new meter: - -```ts -import { - UngroupedProcessor, - MetricDescriptor, - CounterSumAggregator, - ObserverAggregator, - MeasureExactAggregator, - MeterProvider, - Aggregator, -} from '@opentelemetry/sdk-metrics'; -import { hrTime } from '@opentelemetry/core'; - -export class AverageAggregator implements Aggregator { - - private _values: number[] = []; - private _limit: number; - - constructor (limit?: number) { - this._limit = limit ?? 10; - } - - update (value: number) { - this._values.push(value); - if (this._values.length >= this._limit) { - this._values = this._values.slice(0, 10); - } - } - - toPoint() { - const sum =this._values.reduce((agg, value) => { - agg += value; - return agg; - }, 0); - return { - value: this._values.length > 0 ? sum / this._values.length : 0, - timestamp: hrTime(), - } - } -} - -export class CustomProcessor extends UngroupedProcessor { - aggregatorFor (metricDescriptor: MetricDescriptor) { - if (metricDescriptor.name === 'requests') { - return new AverageAggregator(10); - } - // this is exactly what the "UngroupedProcessor" does, we will re-use it - // to fallback on the default behavior - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - return new CounterSumAggregator(); - case MetricKind.OBSERVER: - return new ObserverAggregator(); - default: - return new MeasureExactAggregator(); - } - } -} - -const meter = new MeterProvider({ - processor: new CustomProcessor(), - interval: 1000, -}).getMeter('example-custom-processor'); - -const requestsLatency = meter.createHistogram('requests', { - monotonic: true, - description: 'Average latency' -}); -``` diff --git a/doc/tracing.md b/doc/tracing.md index 77787549bc..392ca95736 100644 --- a/doc/tracing.md +++ b/doc/tracing.md @@ -1,6 +1,6 @@ # Tracing -This quick start is for end users of OpenTelemetry who wish to manually trace their applications. If you are a library author, please see the [Library Authors Guide](library-author.md). If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. +This quick start is for end users of OpenTelemetry who wish to manually trace their applications. If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. For a high-level overview of OpenTelemetry tracing in general and definitions of some common terms, you can refer to the [OpenTelemetry Specification Overview][spec-overview] diff --git a/experimental/packages/exporter-logs-otlp-grpc/package.json b/experimental/packages/exporter-logs-otlp-grpc/package.json index bdbe715614..c7e8978fdf 100644 --- a/experimental/packages/exporter-logs-otlp-grpc/package.json +++ b/experimental/packages/exporter-logs-otlp-grpc/package.json @@ -49,7 +49,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/api-logs": "0.43.0", "@opentelemetry/otlp-exporter-base": "0.43.0", diff --git a/experimental/packages/exporter-trace-otlp-grpc/package.json b/experimental/packages/exporter-trace-otlp-grpc/package.json index 5c517b67c9..207880858b 100644 --- a/experimental/packages/exporter-trace-otlp-grpc/package.json +++ b/experimental/packages/exporter-trace-otlp-grpc/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/otlp-exporter-base": "0.43.0", "@types/mocha": "10.0.2", diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json index 270b77580c..9a158ffe39 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@types/mocha": "10.0.2", "@types/node": "18.6.5", diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/package.json b/experimental/packages/opentelemetry-instrumentation-grpc/package.json index a341ceabd7..b2248c30a1 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/package.json +++ b/experimental/packages/opentelemetry-instrumentation-grpc/package.json @@ -48,7 +48,7 @@ "devDependencies": { "@bufbuild/buf": "1.21.0-1", "@grpc/grpc-js": "^1.7.1", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/context-async-hooks": "1.17.0", "@opentelemetry/core": "1.17.0", diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 7d84e0c734..fef5c80dc0 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase private readonly _scheduledDelayMillis: number; private readonly _exportTimeoutMillis: number; + private _isExporting = false; private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; @@ -216,19 +217,28 @@ export abstract class BatchSpanProcessorBase } private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { + if (this._isExporting) return; + const flush = () => { + this._isExporting = true; this._flushOneBatch() .then(() => { + this._isExporting = false; if (this._finishedSpans.length > 0) { this._clearTimer(); this._maybeStartTimer(); } }) .catch(e => { + this._isExporting = false; globalErrorHandler(e); }); - }, this._scheduledDelayMillis); + }; + // we only wait if the queue doesn't have enough elements yet + if (this._finishedSpans.length >= this._maxExportBatchSize) { + return flush(); + } + if (this._timer !== undefined) return; + this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc59..83fb3ebe44 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -16,6 +16,7 @@ import { diag, ROOT_CONTEXT } from '@opentelemetry/api'; import { + ExportResult, ExportResultCode, loggingErrorHandler, setGlobalErrorHandler, @@ -27,7 +28,9 @@ import { BasicTracerProvider, BufferConfig, InMemorySpanExporter, + ReadableSpan, Span, + SpanExporter, } from '../../../src'; import { context } from '@opentelemetry/api'; import { TestRecordOnlySampler } from './TestRecordOnlySampler'; @@ -175,43 +178,35 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size reached', done => { - const clock = sinon.useFakeTimers(); + it('should export the sampled spans with buffer size reached', async () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } - const span = createSampledSpan(`${name}_6`); processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); - - setTimeout(async () => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); + assert.strictEqual(exporter.getFinishedSpans().length, 5); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); }); it('should force flush when timeout exceeded', done => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } setTimeout(() => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); done(); }, defaultBufferConfig.scheduledDelayMillis + 1000); @@ -222,14 +217,14 @@ describe('BatchSpanProcessorBase', () => { it('should force flush on demand', () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.forceFlush(); - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); }); it('should not export empty span lists', done => { @@ -466,17 +461,10 @@ describe('BatchSpanProcessorBase', () => { const debugStub = sinon.spy(diag, 'debug'); const warnStub = sinon.spy(diag, 'warn'); const span = createSampledSpan('test'); - for (let i = 0, j = 6; i < j; i++) { + for (let i = 0, j = 12; i < j; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } - assert.equal(processor['_finishedSpans'].length, 6); - assert.equal(processor['_droppedSpansCount'], 0); - sinon.assert.notCalled(debugStub); - - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - assert.equal(processor['_finishedSpans'].length, 6); assert.equal(processor['_droppedSpansCount'], 1); sinon.assert.calledOnce(debugStub); @@ -517,4 +505,45 @@ describe('BatchSpanProcessorBase', () => { }); }); }); + + describe('Concurrency', () => { + it('should only send a single batch at a time', async () => { + const callbacks: ((result: ExportResult) => void)[] = []; + const spans: ReadableSpan[] = []; + const exporter: SpanExporter = { + export: async ( + exportedSpans: ReadableSpan[], + resultCallback: (result: ExportResult) => void + ) => { + callbacks.push(resultCallback); + spans.push(...exportedSpans); + }, + shutdown: async () => {}, + }; + const processor = new BatchSpanProcessor(exporter, { + maxExportBatchSize: 5, + maxQueueSize: 6, + }); + const totalSpans = 50; + for (let i = 0; i < totalSpans; i++) { + const span = createSampledSpan(`${name}_${i}`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + } + assert.equal(callbacks.length, 1); + assert.equal(spans.length, 5); + callbacks[0]({ code: ExportResultCode.SUCCESS }); + await new Promise(resolve => setTimeout(resolve, 0)); + // After the first batch completes we will have dropped a number + // of spans and the next batch will be smaller + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + callbacks[1]({ code: ExportResultCode.SUCCESS }); + + // We expect that all the other spans have been dropped + await new Promise(resolve => setTimeout(resolve, 0)); + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + }); + }); }); diff --git a/packages/opentelemetry-sdk-trace-web/package.json b/packages/opentelemetry-sdk-trace-web/package.json index 2e13b13586..afa8e31c5b 100644 --- a/packages/opentelemetry-sdk-trace-web/package.json +++ b/packages/opentelemetry-sdk-trace-web/package.json @@ -60,7 +60,7 @@ "@opentelemetry/context-zone": "1.17.0", "@opentelemetry/propagator-b3": "1.17.0", "@opentelemetry/resources": "1.17.0", - "@types/jquery": "3.5.20", + "@types/jquery": "3.5.21", "@types/mocha": "10.0.2", "@types/node": "18.6.5", "@types/sinon": "10.0.18",