Skip to content

Commit

Permalink
fix: BatchExporter should export continuously when batch size is reac…
Browse files Browse the repository at this point in the history
…hed (#3958)

* fix: BathExporter should export continuously when batch size is reached

* fix: add tests

* lintfix

* add changelog

* add test for concurrency

* Update CHANGELOG.md

* Apply suggestions from code review

* Lint and fix browser tests

* fix: lint

---------

Co-authored-by: Daniel Dyla <[email protected]>
Co-authored-by: Marc Pichler <[email protected]>
  • Loading branch information
3 people authored Oct 6, 2023
1 parent 5ce32c0 commit 9122561
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :bug: (Bug Fix)

* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord
* fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155)
* fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc
* Instruments that were created, but did not have measurements will not be exported anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;

private _isExporting = false;
private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
private _shutdownOnce: BindOnceFuture<void>;
Expand Down Expand Up @@ -216,19 +217,28 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;
this._timer = setTimeout(() => {
if (this._isExporting) return;
const flush = () => {
this._isExporting = true;
this._flushOneBatch()
.then(() => {
this._isExporting = false;
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
this._isExporting = false;
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);
};
// we only wait if the queue doesn't have enough elements yet
if (this._finishedSpans.length >= this._maxExportBatchSize) {
return flush();
}
if (this._timer !== undefined) return;
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);
unrefTimer(this._timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { diag, ROOT_CONTEXT } from '@opentelemetry/api';
import {
ExportResult,
ExportResultCode,
loggingErrorHandler,
setGlobalErrorHandler,
Expand All @@ -27,7 +28,9 @@ import {
BasicTracerProvider,
BufferConfig,
InMemorySpanExporter,
ReadableSpan,
Span,
SpanExporter,
} from '../../../src';
import { context } from '@opentelemetry/api';
import { TestRecordOnlySampler } from './TestRecordOnlySampler';
Expand Down Expand Up @@ -175,43 +178,35 @@ describe('BatchSpanProcessorBase', () => {
assert.strictEqual(spy.args.length, 0);
});

it('should export the sampled spans with buffer size reached', done => {
const clock = sinon.useFakeTimers();
it('should export the sampled spans with buffer size reached', async () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
const span = createSampledSpan(name);
for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) {
processor.onStart(span, ROOT_CONTEXT);
assert.strictEqual(exporter.getFinishedSpans().length, 0);

processor.onEnd(span);
assert.strictEqual(exporter.getFinishedSpans().length, 0);
}
const span = createSampledSpan(`${name}_6`);
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

setTimeout(async () => {
assert.strictEqual(exporter.getFinishedSpans().length, 5);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000);
clock.restore();
assert.strictEqual(exporter.getFinishedSpans().length, 5);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should force flush when timeout exceeded', done => {
const clock = sinon.useFakeTimers();
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
const span = createSampledSpan(name);
for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
assert.strictEqual(exporter.getFinishedSpans().length, 0);
}

setTimeout(() => {
assert.strictEqual(exporter.getFinishedSpans().length, 5);
assert.strictEqual(exporter.getFinishedSpans().length, 4);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);

Expand All @@ -222,14 +217,14 @@ describe('BatchSpanProcessorBase', () => {

it('should force flush on demand', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
const span = createSampledSpan(name);
for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
}
assert.strictEqual(exporter.getFinishedSpans().length, 0);
processor.forceFlush();
assert.strictEqual(exporter.getFinishedSpans().length, 5);
assert.strictEqual(exporter.getFinishedSpans().length, 4);
});

it('should not export empty span lists', done => {
Expand Down Expand Up @@ -466,17 +461,10 @@ describe('BatchSpanProcessorBase', () => {
const debugStub = sinon.spy(diag, 'debug');
const warnStub = sinon.spy(diag, 'warn');
const span = createSampledSpan('test');
for (let i = 0, j = 6; i < j; i++) {
for (let i = 0, j = 12; i < j; i++) {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
}
assert.equal(processor['_finishedSpans'].length, 6);
assert.equal(processor['_droppedSpansCount'], 0);
sinon.assert.notCalled(debugStub);

processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.equal(processor['_finishedSpans'].length, 6);
assert.equal(processor['_droppedSpansCount'], 1);
sinon.assert.calledOnce(debugStub);
Expand Down Expand Up @@ -517,4 +505,45 @@ describe('BatchSpanProcessorBase', () => {
});
});
});

describe('Concurrency', () => {
it('should only send a single batch at a time', async () => {
const callbacks: ((result: ExportResult) => void)[] = [];
const spans: ReadableSpan[] = [];
const exporter: SpanExporter = {
export: async (
exportedSpans: ReadableSpan[],
resultCallback: (result: ExportResult) => void
) => {
callbacks.push(resultCallback);
spans.push(...exportedSpans);
},
shutdown: async () => {},
};
const processor = new BatchSpanProcessor(exporter, {
maxExportBatchSize: 5,
maxQueueSize: 6,
});
const totalSpans = 50;
for (let i = 0; i < totalSpans; i++) {
const span = createSampledSpan(`${name}_${i}`);
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);
}
assert.equal(callbacks.length, 1);
assert.equal(spans.length, 5);
callbacks[0]({ code: ExportResultCode.SUCCESS });
await new Promise(resolve => setTimeout(resolve, 0));
// After the first batch completes we will have dropped a number
// of spans and the next batch will be smaller
assert.equal(callbacks.length, 2);
assert.equal(spans.length, 10);
callbacks[1]({ code: ExportResultCode.SUCCESS });

// We expect that all the other spans have been dropped
await new Promise(resolve => setTimeout(resolve, 0));
assert.equal(callbacks.length, 2);
assert.equal(spans.length, 10);
});
});
});

0 comments on commit 9122561

Please sign in to comment.