Skip to content

Commit

Permalink
feat(sdk-metrics)!: replace attributeKeys with an option to add custo…
Browse files Browse the repository at this point in the history
…m processors (#4532)
  • Loading branch information
pichlermarc authored Jun 18, 2024
1 parent a30989f commit 55f15e9
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 66 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_NEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* chore(otel-core): replace deprecated SpanAttributes [#4408](https://github.com/open-telemetry/opentelemetry-js/pull/4408) @JamieDanielson
* feat(sdk-metrics)!: remove MeterProvider.addMetricReader() in favor of constructor option [#4419](https://github.com/open-telemetry/opentelemetry-js/pull/4419) @pichlermarc
* chore(otel-resources): replace deprecated SpanAttributes [#4428](https://github.com/open-telemetry/opentelemetry-js/pull/4428) @JamieDanielson
* feat(sdk-metrics)!: remove MeterProvider.addMetricReader() in favor of constructor option [#4419](https://github.com/open-telemetry/opentelemetry-js/pull/4419) @pichlermarc
* feat(sdk-metrics)!: replace attributeKeys with custom processors option [#4532](https://github.com/open-telemetry/opentelemetry-js/pull/4532) @pichlermarc

### :rocket: (Enhancement)

Expand Down
6 changes: 6 additions & 0 deletions packages/sdk-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ export {

export { View, ViewOptions } from './view/View';

export {
IAttributesProcessor,
createAllowListAttributesProcessor,
createDenyListAttributesProcessor,
} from './view/AttributesProcessor';

export { TimeoutError } from './utils';
4 changes: 2 additions & 2 deletions packages/sdk-metrics/src/state/AsyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import { HrTime } from '@opentelemetry/api';
import { Accumulation, Aggregator } from '../aggregator/types';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
Expand All @@ -26,6 +25,7 @@ import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { AttributeHashMap } from './HashMap';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';
import { IAttributesProcessor } from '../view/AttributesProcessor';

/**
* Internal interface.
Expand All @@ -42,7 +42,7 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
constructor(
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _attributesProcessor: IAttributesProcessor,
collectorHandles: MetricCollectorHandle[]
) {
super(_instrumentDescriptor);
Expand Down
9 changes: 6 additions & 3 deletions packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { ObservableRegistry } from './ObservableRegistry';
import { SyncMetricStorage } from './SyncMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { AttributesProcessor } from '../view/AttributesProcessor';
import {
createNoopAttributesProcessor,
IAttributesProcessor,
} from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';

/**
Expand Down Expand Up @@ -166,7 +169,7 @@ export class MeterSharedState {
const storage = new MetricStorageType(
descriptor,
aggregator,
AttributesProcessor.Noop(),
createNoopAttributesProcessor(),
[collector]
) as R;
this.metricStorageRegistry.registerForCollector(collector, storage);
Expand All @@ -189,7 +192,7 @@ interface MetricStorageConstructor {
new (
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<Maybe<Accumulation>>,
attributesProcessor: AttributesProcessor,
attributesProcessor: IAttributesProcessor,
collectors: MetricCollectorHandle[]
): MetricStorage;
}
4 changes: 2 additions & 2 deletions packages/sdk-metrics/src/state/SyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { Context, HrTime, MetricAttributes } from '@opentelemetry/api';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { IAttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
Expand All @@ -41,7 +41,7 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
constructor(
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _attributesProcessor: IAttributesProcessor,
collectorHandles: MetricCollectorHandle[]
) {
super(instrumentDescriptor);
Expand Down
98 changes: 77 additions & 21 deletions packages/sdk-metrics/src/view/AttributesProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,46 @@
* limitations under the License.
*/

import { Context, MetricAttributes } from '@opentelemetry/api';
import { Context, Attributes } from '@opentelemetry/api';

/**
* The {@link AttributesProcessor} is responsible for customizing which
* attribute(s) are to be reported as metrics dimension(s) and adding
* additional dimension(s) from the {@link Context}.
*/
export abstract class AttributesProcessor {
export interface IAttributesProcessor {
/**
* Process the metric instrument attributes.
*
* @param incoming The metric instrument attributes.
* @param context The active context when the instrument is synchronous.
* `undefined` otherwise.
*/
abstract process(
incoming: MetricAttributes,
context?: Context
): MetricAttributes;

static Noop() {
return NOOP;
}
process: (incoming: Attributes, context?: Context) => Attributes;
}

export class NoopAttributesProcessor extends AttributesProcessor {
process(incoming: MetricAttributes, _context?: Context) {
class NoopAttributesProcessor implements IAttributesProcessor {
process(incoming: Attributes, _context?: Context) {
return incoming;
}
}

/**
* {@link AttributesProcessor} that filters by allowed attribute names and drops any names that are not in the
* allow list.
*/
export class FilteringAttributesProcessor extends AttributesProcessor {
constructor(private _allowedAttributeNames: string[]) {
super();
class MultiAttributesProcessor implements IAttributesProcessor {
constructor(private readonly _processors: IAttributesProcessor[]) {}
process(incoming: Attributes, context?: Context): Attributes {
let filteredAttributes = incoming;
for (const processor of this._processors) {
filteredAttributes = processor.process(filteredAttributes, context);
}
return filteredAttributes;
}
}

class AllowListProcessor implements IAttributesProcessor {
constructor(private _allowedAttributeNames: string[]) {}

process(incoming: MetricAttributes, _context: Context): MetricAttributes {
const filteredAttributes: MetricAttributes = {};
process(incoming: Attributes, _context?: Context): Attributes {
const filteredAttributes: Attributes = {};
Object.keys(incoming)
.filter(attributeName =>
this._allowedAttributeNames.includes(attributeName)
Expand All @@ -68,4 +66,62 @@ export class FilteringAttributesProcessor extends AttributesProcessor {
}
}

class DenyListProcessor implements IAttributesProcessor {
constructor(private _deniedAttributeNames: string[]) {}

process(incoming: Attributes, _context?: Context): Attributes {
const filteredAttributes: Attributes = {};
Object.keys(incoming)
.filter(
attributeName => !this._deniedAttributeNames.includes(attributeName)
)
.forEach(
attributeName =>
(filteredAttributes[attributeName] = incoming[attributeName])
);
return filteredAttributes;
}
}

/**
* @internal
*
* Create an {@link IAttributesProcessor} that acts as a simple pass-through for attributes.
*/
export function createNoopAttributesProcessor(): IAttributesProcessor {
return NOOP;
}

/**
* @internal
*
* Create an {@link IAttributesProcessor} that applies all processors from the provided list in order.
*
* @param processors Processors to apply in order.
*/
export function createMultiAttributesProcessor(
processors: IAttributesProcessor[]
): IAttributesProcessor {
return new MultiAttributesProcessor(processors);
}

/**
* Create an {@link IAttributesProcessor} that filters by allowed attribute names and drops any names that are not in the
* allow list.
*/
export function createAllowListAttributesProcessor(
attributeAllowList: string[]
): IAttributesProcessor {
return new AllowListProcessor(attributeAllowList);
}

/**
* Create an {@link IAttributesProcessor} that drops attributes based on the names provided in the deny list
*/
export function createDenyListAttributesProcessor(
attributeDenyList: string[]
): IAttributesProcessor {
return new DenyListProcessor(attributeDenyList);
}

const NOOP = new NoopAttributesProcessor();
32 changes: 18 additions & 14 deletions packages/sdk-metrics/src/view/View.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

import { PatternPredicate } from './Predicate';
import {
AttributesProcessor,
FilteringAttributesProcessor,
createMultiAttributesProcessor,
createNoopAttributesProcessor,
IAttributesProcessor,
} from './AttributesProcessor';
import { InstrumentSelector } from './InstrumentSelector';
import { MeterSelector } from './MeterSelector';
Expand All @@ -42,15 +43,18 @@ export type ViewOptions = {
description?: string;
/**
* Alters the metric stream:
* If provided, the attributes that are not in the list will be ignored.
* If provided, the attributes will be modified as defined by the processors in the list. Processors are applied
* in the order they're provided.
* If not provided, all attribute keys will be used by default.
*
* @example <caption>drops all attributes with top-level keys except for 'myAttr' and 'myOtherAttr'</caption>
* attributeKeys: ['myAttr', 'myOtherAttr']
* attributesProcessors: [createAllowListProcessor(['myAttr', 'myOtherAttr'])]
* @example <caption>drops all attributes</caption>
* attributeKeys: []
* attributesProcessors: [createAllowListProcessor([])]
* @example <caption>allows all attributes except for 'myAttr'</caption>
* attributesProcessors: [createDenyListProcessor(['myAttr']]
*/
attributeKeys?: string[];
attributesProcessors?: IAttributesProcessor[];
/**
* Alters the metric stream:
* Alters the {@link Aggregation} of the metric stream.
Expand Down Expand Up @@ -135,7 +139,7 @@ export class View {
readonly name?: string;
readonly description?: string;
readonly aggregation: Aggregation;
readonly attributesProcessor: AttributesProcessor;
readonly attributesProcessor: IAttributesProcessor;
readonly instrumentSelector: InstrumentSelector;
readonly meterSelector: MeterSelector;

Expand All @@ -157,9 +161,9 @@ export class View {
* Alters the metric stream:
* This will be used as the description of the metrics stream.
* If not provided, the original Instrument description will be used by default.
* @param viewOptions.attributeKeys
* @param viewOptions.attributesProcessors
* Alters the metric stream:
* If provided, the attributes that are not in the list will be ignored.
* If provided, the attributes will be modified as defined by the added processors.
* If not provided, all attribute keys will be used by default.
* @param viewOptions.aggregation
* Alters the metric stream:
Expand Down Expand Up @@ -210,13 +214,13 @@ export class View {
);
}

// Create AttributesProcessor if attributeKeys are defined set.
if (viewOptions.attributeKeys != null) {
this.attributesProcessor = new FilteringAttributesProcessor(
viewOptions.attributeKeys
// Create multi-processor if attributesProcessors are defined.
if (viewOptions.attributesProcessors != null) {
this.attributesProcessor = createMultiAttributesProcessor(
viewOptions.attributesProcessors
);
} else {
this.attributesProcessor = AttributesProcessor.Noop();
this.attributesProcessor = createNoopAttributesProcessor();
}

this.name = viewOptions.name;
Expand Down
7 changes: 5 additions & 2 deletions packages/sdk-metrics/test/MeterProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { TestMetricReader } from './export/TestMetricReader';
import * as sinon from 'sinon';
import { View } from '../src/view/View';
import { Meter } from '../src/Meter';
import { createAllowListAttributesProcessor } from '../src/view/AttributesProcessor';

describe('MeterProvider', () => {
afterEach(() => {
Expand Down Expand Up @@ -200,15 +201,17 @@ describe('MeterProvider', () => {
);
});

it('with attributeKeys should drop non-listed attributes', async () => {
it('with allowListProcessor should drop non-listed attributes', async () => {
const reader = new TestMetricReader();

// Add view to drop all attributes except 'attrib1'
const meterProvider = new MeterProvider({
resource: defaultResource,
views: [
new View({
attributeKeys: ['attrib1'],
attributesProcessors: [
createAllowListAttributesProcessor(['attrib1']),
],
instrumentName: 'non-renamed-instrument',
}),
],
Expand Down
14 changes: 7 additions & 7 deletions packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { AggregationTemporality } from '../../src/export/AggregationTemporality'
import { DataPointType } from '../../src/export/MetricData';
import { MetricCollectorHandle } from '../../src/state/MetricCollector';
import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage';
import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor';
import { createNoopAttributesProcessor } from '../../src/view/AttributesProcessor';
import { ObservableRegistry } from '../../src/state/ObservableRegistry';
import {
assertMetricData,
Expand Down Expand Up @@ -49,7 +49,7 @@ describe('AsyncMetricStorage', () => {
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor(),
createNoopAttributesProcessor(),
[deltaCollector]
);

Expand Down Expand Up @@ -149,7 +149,7 @@ describe('AsyncMetricStorage', () => {
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor(),
createNoopAttributesProcessor(),
[deltaCollector]
);

Expand Down Expand Up @@ -233,7 +233,7 @@ describe('AsyncMetricStorage', () => {
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(false),
new NoopAttributesProcessor(),
createNoopAttributesProcessor(),
[deltaCollector]
);

Expand Down Expand Up @@ -319,7 +319,7 @@ describe('AsyncMetricStorage', () => {
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor(),
createNoopAttributesProcessor(),
[cumulativeCollector]
);

Expand Down Expand Up @@ -451,7 +451,7 @@ describe('AsyncMetricStorage', () => {
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor(),
createNoopAttributesProcessor(),
[cumulativeCollector]
);

Expand Down Expand Up @@ -545,7 +545,7 @@ describe('AsyncMetricStorage', () => {
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(false),
new NoopAttributesProcessor(),
createNoopAttributesProcessor(),
[cumulativeCollector]
);

Expand Down
Loading

0 comments on commit 55f15e9

Please sign in to comment.