Skip to content

Commit

Permalink
fix(sdk-metrics): merge uncollected delta accumulations (#3667)
Browse files Browse the repository at this point in the history
Co-authored-by: Marc Pichler <[email protected]>
  • Loading branch information
legendecas and pichlermarc authored Mar 14, 2023
1 parent d82a098 commit 2276977
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

* fix(core): added falsy check to make otel core work with browser where webpack config had process as false or null [#3613](https://github.com/open-telemetry/opentelemetry-js/issues/3613) @ravindra-dyte
* fix(instrumentation-http): include query params in http.target [#3646](https://github.com/open-telemetry/opentelemetry-js/pull/3646) @kobi-co
* fix(sdk-metrics): merge uncollected delta accumulations [#3667](https://github.com/open-telemetry/opentelemetry-js/pull/3667) @legendecas

### :books: (Refine Doc)

Expand Down
11 changes: 11 additions & 0 deletions packages/sdk-metrics/src/state/DeltaMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
this._aggregator.createAccumulation(collectionTime);
accumulation?.record(value);
let delta = accumulation;
// Diff with recorded cumulative memo.
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
// has() returned true, previous is present.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -66,6 +67,16 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
)!;
delta = this._aggregator.diff(previous, accumulation);
}
// Merge with uncollected active delta.
if (this._activeCollectionStorage.has(attributes, hashCode)) {
// has() returned true, previous is present.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const active = this._activeCollectionStorage.get(
attributes,
hashCode
)!;
delta = this._aggregator.merge(active, delta);
}

// Save the current record and the delta record.
this._cumulativeMemoStorage.set(attributes, accumulation, hashCode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import { DataPointType, MeterProvider, MetricReader } from '../../src';
import { TestDeltaMetricReader } from '../export/TestMetricReader';
import { assertDataPoint, assertMetricData } from '../util';

// https://github.com/open-telemetry/opentelemetry-js/issues/3664

describe('two-metric-readers-async-instrument', () => {
it('both metric readers should collect metrics', async () => {
const meterProvider = new MeterProvider();
const reader1 = new TestDeltaMetricReader();
const reader2 = new TestDeltaMetricReader();

meterProvider.addMetricReader(reader1);
meterProvider.addMetricReader(reader2);

const meter = meterProvider.getMeter('my-meter');

let counter = 1;
const asyncUpDownCounter = meter.createObservableUpDownCounter(
'my_async_updowncounter'
);
asyncUpDownCounter.addCallback(observableResult => {
observableResult.observe(counter);
});

await assertCollection(reader1, 1);
await assertCollection(reader2, 1);

counter = 10;
await assertCollection(reader1, 9);
await assertCollection(reader2, 9);

async function assertCollection(reader: MetricReader, value: number) {
const { errors, resourceMetrics } = await reader.collect();
assert.strictEqual(errors.length, 0);

// Collected only one Metric.
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];

assertMetricData(metric, DataPointType.SUM, {
name: 'my_async_updowncounter',
});
assertDataPoint(metric.dataPoints[0], {}, value);
}
});
});
20 changes: 20 additions & 0 deletions packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ describe('DeltaMetricProcessor', () => {
assert.strictEqual(accumulation?.toPointValue(), 11);
}
});

it('should merge with active delta of accumulations', () => {
const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true));

{
const measurements = new AttributeHashMap<number>();
measurements.set({}, 10);
metricProcessor.batchCumulate(measurements, [0, 0]);
}

{
const measurements = new AttributeHashMap<number>();
measurements.set({}, 20);
metricProcessor.batchCumulate(measurements, [1, 1]);
}

const accumulations = metricProcessor.collect();
const accumulation = accumulations.get({});
assert.strictEqual(accumulation?.toPointValue(), 20);
});
});

describe('collect', () => {
Expand Down

0 comments on commit 2276977

Please sign in to comment.