diff --git a/pkg/api/api.go b/pkg/api/api.go index e2f6da5735c..a60b2fb80d5 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -259,13 +259,14 @@ const OTLPPushEndpoint = "/otlp/v1/metrics" // RegisterDistributor registers the endpoints associated with the distributor. func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) + limits.SpecializeResourceAttributePromotionConfig(pushConfig.OTelResourceAttributePromotionConfig) a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler( pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger, ), true, false, "POST") a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler( - pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig, + pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, ), true, false, "POST") diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b17537465bd..5e29a23010e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -191,14 +191,8 @@ type Distributor struct { func defaultSleep(d time.Duration) { time.Sleep(d) } func defaultNow() time.Time { return time.Now() } -// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion. -type OTelResourceAttributePromotionConfig interface { - // PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID. - PromoteOTelResourceAttributes(id string) []string -} - // Config contains the configuration required to -// create a Distributor +// create a Distributor. type Config struct { PoolConfig PoolConfig `yaml:"pool"` @@ -247,7 +241,7 @@ type Config struct { ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"` // OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion. - OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"` + OTelResourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig `yaml:"-"` } // PushWrapper wraps around a push. It is similar to middleware.Interface. diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 79f02bb1bbe..5bc2c35dab7 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -48,9 +48,10 @@ const ( ) type OTLPHandlerLimits interface { + validation.OTelResourceAttributePromotionConfig + OTelMetricSuffixesEnabled(id string) bool OTelCreatedTimestampZeroIngestionEnabled(id string) bool - PromoteOTelResourceAttributes(id string) []string OTelKeepIdentifyingResourceAttributes(id string) bool } @@ -60,7 +61,6 @@ func OTLPHandler( requestBufferPool util.Pool, sourceIPs *middleware.SourceIPExtractor, limits OTLPHandlerLimits, - resourceAttributePromotionConfig OTelResourceAttributePromotionConfig, retryCfg RetryConfig, push PushFunc, pushMetrics *PushMetrics, @@ -171,10 +171,7 @@ func OTLPHandler( } addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID) enableCTZeroIngestion := limits.OTelCreatedTimestampZeroIngestionEnabled(tenantID) - if resourceAttributePromotionConfig == nil { - resourceAttributePromotionConfig = limits - } - promoteResourceAttributes := resourceAttributePromotionConfig.PromoteOTelResourceAttributes(tenantID) + promoteResourceAttributes := limits.PromoteOTelResourceAttributes(tenantID) keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID) pushMetrics.IncOTLPRequest(tenantID) diff --git a/pkg/distributor/otel_test.go b/pkg/distributor/otel_test.go index c381660a76b..6d723bf58cf 100644 --- a/pkg/distributor/otel_test.go +++ b/pkg/distributor/otel_test.go @@ -351,7 +351,7 @@ func BenchmarkOTLPHandler(b *testing.B) { validation.NewMockTenantLimits(map[string]*validation.Limits{}), ) require.NoError(b, err) - handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger()) + handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger()) b.Run("protobuf", func(b *testing.B) { req := createOTLPProtoRequest(b, exportReq, "") @@ -491,7 +491,7 @@ func TestHandlerOTLPPush(t *testing.T) { expectedRetryHeader bool promoteResourceAttributes []string expectedAttributePromotions map[string]string - resourceAttributePromotionConfig OTelResourceAttributePromotionConfig + resourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig } samplesVerifierFunc := func(t *testing.T, pushReq *Request, tc testCase) error { @@ -742,6 +742,8 @@ func TestHandlerOTLPPush(t *testing.T) { }), ) require.NoError(t, err) + limits.SpecializeResourceAttributePromotionConfig(tt.resourceAttributePromotionConfig) + pusher := func(_ context.Context, pushReq *Request) error { t.Helper() t.Cleanup(pushReq.CleanUp) @@ -750,7 +752,7 @@ func TestHandlerOTLPPush(t *testing.T) { logs := &concurrency.SyncBuffer{} retryConfig := RetryConfig{Enabled: true, MinBackoff: 5 * time.Second, MaxBackoff: 5 * time.Second} - handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, tt.resourceAttributePromotionConfig, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo())) + handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo())) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) @@ -823,7 +825,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) { req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "") resp := httptest.NewRecorder() - handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error { + handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error { request, err := pushReq.WriteRequest() assert.NoError(t, err) assert.Len(t, request.Timeseries, 3) @@ -869,7 +871,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "") resp := httptest.NewRecorder() - handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error { + handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error { request, err := pushReq.WriteRequest() t.Cleanup(pushReq.CleanUp) require.NoError(t, err) @@ -895,7 +897,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { req = createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "") resp = httptest.NewRecorder() - handler = OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error { + handler = OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error { request, err := pushReq.WriteRequest() t.Cleanup(pushReq.CleanUp) require.NoError(t, err) @@ -923,7 +925,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) { resp := httptest.NewRecorder() - handler := OTLPHandler(140, nil, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger()) + handler := OTLPHandler(140, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger()) handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code) body, err := io.ReadAll(resp.Body) diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 58e4704b2b8..11da31b7b9c 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) { return nil } - h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, nil, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger()) + h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger()) srv.HTTP.Handle("/otlp", h) // start the server diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index b12fc465fd0..0e3d815fb47 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -506,6 +506,9 @@ type TenantLimits interface { type Overrides struct { defaultLimits *Limits tenantLimits TenantLimits + + // otelResourceAttributePromotionCfg, if set, specializes OTel resource attribute promotion configuration. + otelResourceAttributePromotionCfg OTelResourceAttributePromotionConfig } // NewOverrides makes a new Overrides. @@ -1112,9 +1115,18 @@ func (o *Overrides) OTelCreatedTimestampZeroIngestionEnabled(tenantID string) bo } func (o *Overrides) PromoteOTelResourceAttributes(tenantID string) []string { + if o.otelResourceAttributePromotionCfg != nil { + return o.otelResourceAttributePromotionCfg.PromoteOTelResourceAttributes(tenantID) + } return o.getOverridesForUser(tenantID).PromoteOTelResourceAttributes } +// SpecializeResourceAttributePromotionConfig specializes OTel resource attribute promotion configuration. +// This is to allow for plugging in a non-default method for configuring resource attribute promotion. +func (o *Overrides) SpecializeResourceAttributePromotionConfig(specialization OTelResourceAttributePromotionConfig) { + o.otelResourceAttributePromotionCfg = specialization +} + func (o *Overrides) OTelKeepIdentifyingResourceAttributes(tenantID string) bool { return o.getOverridesForUser(tenantID).OTelKeepIdentifyingResourceAttributes } @@ -1147,6 +1159,12 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits { return o.defaultLimits } +// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion. +type OTelResourceAttributePromotionConfig interface { + // PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID. + PromoteOTelResourceAttributes(id string) []string +} + // AllTrueBooleansPerTenant returns true only if limit func is true for all given tenants func AllTrueBooleansPerTenant(tenantIDs []string, f func(string) bool) bool { for _, tenantID := range tenantIDs { diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 9dc82df2d05..81511cd264b 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -1399,6 +1399,43 @@ alertmanager_max_grafana_state_size_bytes: "0" } } +func TestOverrides_PromoteOTelResourceAttributes(t *testing.T) { + const tenant = "tenant" + + t.Run("default implementation", func(t *testing.T) { + overrides, err := NewOverrides( + Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}}, + NewMockTenantLimits(map[string]*Limits{}), + ) + require.NoError(t, err) + + attrs := overrides.PromoteOTelResourceAttributes(tenant) + require.Equal(t, []string{"default.attribute"}, attrs) + }) + + t.Run("specialized implementation", func(t *testing.T) { + overrides, err := NewOverrides( + Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}}, + NewMockTenantLimits(map[string]*Limits{}), + ) + require.NoError(t, err) + + overrides.SpecializeResourceAttributePromotionConfig(fakeOTelResourceAttributePromotionConfig{ + resourceAttrs: map[string][]string{tenant: {"specialized.attribute"}}, + }) + attrs := overrides.PromoteOTelResourceAttributes(tenant) + require.Equal(t, []string{"specialized.attribute"}, attrs) + }) +} + +type fakeOTelResourceAttributePromotionConfig struct { + resourceAttrs map[string][]string +} + +func (c fakeOTelResourceAttributePromotionConfig) PromoteOTelResourceAttributes(tenant string) []string { + return c.resourceAttrs[tenant] +} + func getDefaultLimits() Limits { limits := Limits{} flagext.DefaultValues(&limits)