Skip to content

Commit

Permalink
fix(sdk-metrics): prevent per-reader storages from keeping unreported…
Browse files Browse the repository at this point in the history
… accumulations in memory
  • Loading branch information
pichlermarc committed Sep 26, 2023
1 parent f2fc0d8 commit 1f51d8c
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 86 deletions.
Empty file.
6 changes: 4 additions & 2 deletions packages/sdk-metrics/src/state/AsyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,19 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
*/
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
this._instrumentDescriptor,
accumulations,
collectionTime
);
}

registerCollector(collector: MetricCollectorHandle): void {
this._temporalMetricStorage.registerCollector(collector);
}
}
10 changes: 5 additions & 5 deletions packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ export class MeterSharedState {
this.metricStorageRegistry.getStorages(collector)
)
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
collectionTime
);
return metricStorage.collect(collector, collectionTime);
})
.filter(isNotNullish);

Expand Down Expand Up @@ -137,6 +133,9 @@ export class MeterSharedState {
aggregator,
view.attributesProcessor
) as R;
for (const collector of this._meterProviderSharedState.metricCollectors) {
viewStorage.registerCollector(collector);
}
this.metricStorageRegistry.register(viewStorage);
return viewStorage;
});
Expand All @@ -161,6 +160,7 @@ export class MeterSharedState {
aggregator,
AttributesProcessor.Noop()
) as R;
storage.registerCollector(collector);
this.metricStorageRegistry.registerForCollector(collector, storage);
return storage;
}
Expand Down
11 changes: 10 additions & 1 deletion packages/sdk-metrics/src/state/MetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ export abstract class MetricStorage {
*/
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData>;

/**
* Registers a collector that this storage will be used with. Failing to register
* a collector will result in dropped metrics.
*
* Note: Memory pressure may build if a collector is registered but does not collect
* on this instance. Once registered, ensure that `collect()` is called in reasonable intervals.
* @param collector
*/
abstract registerCollector(collector: MetricCollectorHandle): void;

getInstrumentDescriptor(): Readonly<InstrumentDescriptor> {
return this._instrumentDescriptor;
}
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk-metrics/src/state/SyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,19 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
*/
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
this._instrumentDescriptor,
accumulations,
collectionTime
);
}

registerCollector(collector: MetricCollectorHandle): void {
this._temporalMetricStorage.registerCollector(collector);
}
}
25 changes: 15 additions & 10 deletions packages/sdk-metrics/src/state/TemporalMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {

constructor(private _aggregator: Aggregator<T>) {}

registerCollector(collector: MetricCollectorHandle) {
let stash = this._unreportedAccumulations.get(collector);
if (stash === undefined) {
stash = [];
this._unreportedAccumulations.set(collector, stash);
}
}

/**
* Builds the {@link MetricData} streams to report against a specific MetricCollector.
* @param collector The information of the MetricCollector.
Expand All @@ -74,12 +82,11 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
*/
buildMetrics(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
instrumentDescriptor: InstrumentDescriptor,
currentAccumulations: AttributeHashMap<T>,
collectionTime: HrTime
): Maybe<MetricData> {
this._stashAccumulations(collectors, currentAccumulations);
this._stashAccumulations(currentAccumulations);
const unreportedAccumulations =
this._getMergedUnreportedAccumulations(collector);

Expand Down Expand Up @@ -141,18 +148,16 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
);
}

private _stashAccumulations(
collectors: MetricCollectorHandle[],
currentAccumulation: AttributeHashMap<T>
) {
collectors.forEach(it => {
let stash = this._unreportedAccumulations.get(it);
private _stashAccumulations(currentAccumulation: AttributeHashMap<T>) {
const registeredCollectors = this._unreportedAccumulations.keys();
for (const collector of registeredCollectors) {
let stash = this._unreportedAccumulations.get(collector);
if (stash === undefined) {
stash = [];
this._unreportedAccumulations.set(it, stash);
this._unreportedAccumulations.set(collector, stash);
}
stash.push(currentAccumulation);
});
}
}

private _getMergedUnreportedAccumulations(collector: MetricCollectorHandle) {
Expand Down
37 changes: 13 additions & 24 deletions packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const cumulativeCollector: MetricCollectorHandle = {
describe('AsyncMetricStorage', () => {
describe('collect', () => {
describe('Delta Collector', () => {
const collectors = [deltaCollector];
it('should collect and reset memos', async () => {
const delegate = new ObservableCallbackDelegate();
const observableRegistry = new ObservableRegistry();
Expand All @@ -52,6 +51,8 @@ describe('AsyncMetricStorage', () => {
new SumAggregator(true),
new NoopAttributesProcessor()
);
metricStorage.registerCollector(deltaCollector);

const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
Expand All @@ -70,7 +71,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand Down Expand Up @@ -104,11 +104,7 @@ describe('AsyncMetricStorage', () => {
{
const collectionTime: HrTime = [1, 1];
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);
const metric = metricStorage.collect(deltaCollector, collectionTime);

assertMetricData(metric, DataPointType.SUM);
assert.strictEqual(metric.dataPoints.length, 0);
Expand All @@ -124,7 +120,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand Down Expand Up @@ -163,6 +158,8 @@ describe('AsyncMetricStorage', () => {
new SumAggregator(true),
new NoopAttributesProcessor()
);
metricStorage.registerCollector(deltaCollector);

const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
Expand All @@ -181,7 +178,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand All @@ -207,7 +203,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand All @@ -233,7 +228,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand All @@ -257,6 +251,8 @@ describe('AsyncMetricStorage', () => {
new SumAggregator(false),
new NoopAttributesProcessor()
);
metricStorage.registerCollector(deltaCollector);

const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
Expand All @@ -275,7 +271,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand All @@ -301,7 +296,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand All @@ -327,7 +321,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
deltaCollector,
collectors,
collectionTime
);

Expand All @@ -345,7 +338,6 @@ describe('AsyncMetricStorage', () => {
});

describe('Cumulative Collector', () => {
const collectors = [cumulativeCollector];
it('should collect cumulative metrics', async () => {
const delegate = new ObservableCallbackDelegate();
const observableRegistry = new ObservableRegistry();
Expand All @@ -354,6 +346,8 @@ describe('AsyncMetricStorage', () => {
new SumAggregator(true),
new NoopAttributesProcessor()
);
metricStorage.registerCollector(cumulativeCollector);

const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
Expand All @@ -373,7 +367,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand Down Expand Up @@ -410,7 +403,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand Down Expand Up @@ -449,7 +441,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand Down Expand Up @@ -487,6 +478,8 @@ describe('AsyncMetricStorage', () => {
new SumAggregator(true),
new NoopAttributesProcessor()
);
metricStorage.registerCollector(cumulativeCollector);

const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
Expand All @@ -505,7 +498,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand All @@ -531,7 +523,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand All @@ -558,7 +549,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand All @@ -582,6 +572,8 @@ describe('AsyncMetricStorage', () => {
new SumAggregator(false),
new NoopAttributesProcessor()
);
metricStorage.registerCollector(cumulativeCollector);

const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
Expand All @@ -600,7 +592,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand All @@ -626,7 +617,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand All @@ -652,7 +642,6 @@ describe('AsyncMetricStorage', () => {
await observableRegistry.observe(collectionTime);
const metric = metricStorage.collect(
cumulativeCollector,
collectors,
collectionTime
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import {
class TestMetricStorage extends MetricStorage {
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
return undefined;
}

registerCollector(collector: MetricCollectorHandle): void {}
}

describe('MetricStorageRegistry', () => {
Expand Down
Loading

0 comments on commit 1f51d8c

Please sign in to comment.