diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a330b09e72..edf81233a18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ ### :bug: (Bug Fix) +* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord * fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155) * fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc * Instruments that were created, but did not have measurements will not be exported anymore diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 7d84e0c7349..fef5c80dc0f 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase private readonly _scheduledDelayMillis: number; private readonly _exportTimeoutMillis: number; + private _isExporting = false; private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; @@ -216,19 +217,28 @@ export abstract class BatchSpanProcessorBase } private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { + if (this._isExporting) return; + const flush = () => { + this._isExporting = true; this._flushOneBatch() .then(() => { + this._isExporting = false; if (this._finishedSpans.length > 0) { this._clearTimer(); this._maybeStartTimer(); } }) .catch(e => { + this._isExporting = false; globalErrorHandler(e); }); - }, this._scheduledDelayMillis); + }; + // we only wait if the queue doesn't have enough elements yet + if (this._finishedSpans.length >= this._maxExportBatchSize) { + return flush(); + } + if (this._timer !== undefined) return; + this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc599..83fb3ebe44f 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -16,6 +16,7 @@ import { diag, ROOT_CONTEXT } from '@opentelemetry/api'; import { + ExportResult, ExportResultCode, loggingErrorHandler, setGlobalErrorHandler, @@ -27,7 +28,9 @@ import { BasicTracerProvider, BufferConfig, InMemorySpanExporter, + ReadableSpan, Span, + SpanExporter, } from '../../../src'; import { context } from '@opentelemetry/api'; import { TestRecordOnlySampler } from './TestRecordOnlySampler'; @@ -175,43 +178,35 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size reached', done => { - const clock = sinon.useFakeTimers(); + it('should export the sampled spans with buffer size reached', async () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } - const span = createSampledSpan(`${name}_6`); processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); - - setTimeout(async () => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); + assert.strictEqual(exporter.getFinishedSpans().length, 5); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); }); it('should force flush when timeout exceeded', done => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } setTimeout(() => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); done(); }, defaultBufferConfig.scheduledDelayMillis + 1000); @@ -222,14 +217,14 @@ describe('BatchSpanProcessorBase', () => { it('should force flush on demand', () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.forceFlush(); - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); }); it('should not export empty span lists', done => { @@ -466,17 +461,10 @@ describe('BatchSpanProcessorBase', () => { const debugStub = sinon.spy(diag, 'debug'); const warnStub = sinon.spy(diag, 'warn'); const span = createSampledSpan('test'); - for (let i = 0, j = 6; i < j; i++) { + for (let i = 0, j = 12; i < j; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } - assert.equal(processor['_finishedSpans'].length, 6); - assert.equal(processor['_droppedSpansCount'], 0); - sinon.assert.notCalled(debugStub); - - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - assert.equal(processor['_finishedSpans'].length, 6); assert.equal(processor['_droppedSpansCount'], 1); sinon.assert.calledOnce(debugStub); @@ -517,4 +505,45 @@ describe('BatchSpanProcessorBase', () => { }); }); }); + + describe('Concurrency', () => { + it('should only send a single batch at a time', async () => { + const callbacks: ((result: ExportResult) => void)[] = []; + const spans: ReadableSpan[] = []; + const exporter: SpanExporter = { + export: async ( + exportedSpans: ReadableSpan[], + resultCallback: (result: ExportResult) => void + ) => { + callbacks.push(resultCallback); + spans.push(...exportedSpans); + }, + shutdown: async () => {}, + }; + const processor = new BatchSpanProcessor(exporter, { + maxExportBatchSize: 5, + maxQueueSize: 6, + }); + const totalSpans = 50; + for (let i = 0; i < totalSpans; i++) { + const span = createSampledSpan(`${name}_${i}`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + } + assert.equal(callbacks.length, 1); + assert.equal(spans.length, 5); + callbacks[0]({ code: ExportResultCode.SUCCESS }); + await new Promise(resolve => setTimeout(resolve, 0)); + // After the first batch completes we will have dropped a number + // of spans and the next batch will be smaller + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + callbacks[1]({ code: ExportResultCode.SUCCESS }); + + // We expect that all the other spans have been dropped + await new Promise(resolve => setTimeout(resolve, 0)); + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + }); + }); });