diff --git a/CHANGELOG.md b/CHANGELOG.md index d6a363a9e1..345fe48283 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ * fix(core): added falsy check to make otel core work with browser where webpack config had process as false or null [#3613](https://github.com/open-telemetry/opentelemetry-js/issues/3613) @ravindra-dyte * fix(instrumentation-http): include query params in http.target [#3646](https://github.com/open-telemetry/opentelemetry-js/pull/3646) @kobi-co +* fix(sdk-metrics): merge uncollected delta accumulations [#3667](https://github.com/open-telemetry/opentelemetry-js/pull/3667) @legendecas ### :books: (Refine Doc) diff --git a/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts b/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts index 3e6cb5eefc..e9b16f0e03 100644 --- a/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts +++ b/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts @@ -57,6 +57,7 @@ export class DeltaMetricProcessor> { this._aggregator.createAccumulation(collectionTime); accumulation?.record(value); let delta = accumulation; + // Diff with recorded cumulative memo. if (this._cumulativeMemoStorage.has(attributes, hashCode)) { // has() returned true, previous is present. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -66,6 +67,16 @@ export class DeltaMetricProcessor> { )!; delta = this._aggregator.diff(previous, accumulation); } + // Merge with uncollected active delta. + if (this._activeCollectionStorage.has(attributes, hashCode)) { + // has() returned true, previous is present. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const active = this._activeCollectionStorage.get( + attributes, + hashCode + )!; + delta = this._aggregator.merge(active, delta); + } // Save the current record and the delta record. this._cumulativeMemoStorage.set(attributes, accumulation, hashCode); diff --git a/packages/sdk-metrics/test/regression/two-metric-readers-async-instrument.test.ts b/packages/sdk-metrics/test/regression/two-metric-readers-async-instrument.test.ts new file mode 100644 index 0000000000..a1301be0ec --- /dev/null +++ b/packages/sdk-metrics/test/regression/two-metric-readers-async-instrument.test.ts @@ -0,0 +1,65 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { DataPointType, MeterProvider, MetricReader } from '../../src'; +import { TestDeltaMetricReader } from '../export/TestMetricReader'; +import { assertDataPoint, assertMetricData } from '../util'; + +// https://github.com/open-telemetry/opentelemetry-js/issues/3664 + +describe('two-metric-readers-async-instrument', () => { + it('both metric readers should collect metrics', async () => { + const meterProvider = new MeterProvider(); + const reader1 = new TestDeltaMetricReader(); + const reader2 = new TestDeltaMetricReader(); + + meterProvider.addMetricReader(reader1); + meterProvider.addMetricReader(reader2); + + const meter = meterProvider.getMeter('my-meter'); + + let counter = 1; + const asyncUpDownCounter = meter.createObservableUpDownCounter( + 'my_async_updowncounter' + ); + asyncUpDownCounter.addCallback(observableResult => { + observableResult.observe(counter); + }); + + await assertCollection(reader1, 1); + await assertCollection(reader2, 1); + + counter = 10; + await assertCollection(reader1, 9); + await assertCollection(reader2, 9); + + async function assertCollection(reader: MetricReader, value: number) { + const { errors, resourceMetrics } = await reader.collect(); + assert.strictEqual(errors.length, 0); + + // Collected only one Metric. + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); + const metric = resourceMetrics.scopeMetrics[0].metrics[0]; + + assertMetricData(metric, DataPointType.SUM, { + name: 'my_async_updowncounter', + }); + assertDataPoint(metric.dataPoints[0], {}, value); + } + }); +}); diff --git a/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts b/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts index d2d69f8432..ec0a3d6fff 100644 --- a/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts @@ -100,6 +100,26 @@ describe('DeltaMetricProcessor', () => { assert.strictEqual(accumulation?.toPointValue(), 11); } }); + + it('should merge with active delta of accumulations', () => { + const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true)); + + { + const measurements = new AttributeHashMap(); + measurements.set({}, 10); + metricProcessor.batchCumulate(measurements, [0, 0]); + } + + { + const measurements = new AttributeHashMap(); + measurements.set({}, 20); + metricProcessor.batchCumulate(measurements, [1, 1]); + } + + const accumulations = metricProcessor.collect(); + const accumulation = accumulations.get({}); + assert.strictEqual(accumulation?.toPointValue(), 20); + }); }); describe('collect', () => {