diff --git a/cmd/kas/kasapp/app_agent_server.go b/cmd/kas/kasapp/app_agent_server.go index 60f9a0ff..2450aacd 100644 --- a/cmd/kas/kasapp/app_agent_server.go +++ b/cmd/kas/kasapp/app_agent_server.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool" "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/httpz" "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz" + "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/metric" "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/redistool" "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/tlstool" "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/wstunnel" @@ -47,7 +48,7 @@ type agentServer struct { ready func() } -func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modserver.Api, dt trace.Tracer, +func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modserver.Api, dt trace.Tracer, dm otelmetric.Meter, tp trace.TracerProvider, mp otelmetric.MeterProvider, redisClient rueidis.Client, ssh stats.Handler, factory modserver.AgentRpcApiFactory, ownPrivateApiUrl string, probeRegistry *observability.ProbeRegistry, reg *prometheus.Registry, streamProm grpc.StreamServerInterceptor, unaryProm grpc.UnaryServerInterceptor, @@ -77,7 +78,8 @@ func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modse if err != nil { return nil, err } - agentConnectionLimiter := redistool.NewTokenLimiter( + var agentConnectionLimiter grpctool.ServerLimiter + agentConnectionLimiter = redistool.NewTokenLimiter( redisClient, cfg.Redis.KeyPrefix+":agent_limit", uint64(listenCfg.ConnectionsPerTokenPerMinute), @@ -88,6 +90,17 @@ func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modse } }, ) + agentConnectionLimiter, err = metric.NewAllowLimiterInstrumentation( + "agent_connection", + float64(listenCfg.ConnectionsPerTokenPerMinute), + "{connection/token/m}", + dt, + dm, + agentConnectionLimiter, + ) + if err != nil { + return nil, err + } auxCtx, auxCancel := context.WithCancel(context.Background()) traceContextProp := propagation.TraceContext{} // only want trace id, not baggage from external clients/agents keepaliveOpt, sh := grpctool.MaxConnectionAge2GrpcKeepalive(auxCtx, listenCfg.MaxConnectionAge.AsDuration()) diff --git a/cmd/kas/kasapp/configured_app.go b/cmd/kas/kasapp/configured_app.go index 18bf79e7..a88ffff2 100644 --- a/cmd/kas/kasapp/configured_app.go +++ b/cmd/kas/kasapp/configured_app.go @@ -152,7 +152,7 @@ func (a *ConfiguredApp) Run(ctx context.Context) (retErr error) { dt := tp.Tracer(kasTracerName) // defaultTracer // GitLab REST client - gitLabClient, err := a.constructGitLabClient(tp, mp, p) + gitLabClient, err := a.constructGitLabClient(dt, dm, tp, mp, p) if err != nil { return err } @@ -186,7 +186,7 @@ func (a *ConfiguredApp) Run(ctx context.Context) (retErr error) { } // Server for handling agentk requests - agentSrv, err := newAgentServer(a.Log, a.Configuration, srvApi, dt, tp, mp, redisClient, ssh, agentRpcApiFactory, // nolint: contextcheck + agentSrv, err := newAgentServer(a.Log, a.Configuration, srvApi, dt, dm, tp, mp, redisClient, ssh, agentRpcApiFactory, // nolint: contextcheck privateApiSrv.ownUrl, probeRegistry, reg, streamProm, unaryProm, grpcServerErrorReporter) if err != nil { return fmt.Errorf("agent server: %w", err) @@ -236,7 +236,10 @@ func (a *ConfiguredApp) Run(ctx context.Context) (retErr error) { usageTracker := usage_metrics.NewUsageTracker() // Gitaly client - gitalyClientPool := a.constructGitalyPool(csh, tp, mp, p, streamClientProm, unaryClientProm) + gitalyClientPool, err := a.constructGitalyPool(csh, dt, dm, tp, mp, p, streamClientProm, unaryClientProm) + if err != nil { + return err + } defer errz.SafeClose(gitalyClientPool, &retErr) // Module factories @@ -430,7 +433,8 @@ func (a *ConfiguredApp) loadGitLabClientAuthSecret() ([]byte, error) { return decodedAuthSecret, nil } -func (a *ConfiguredApp) constructGitLabClient(tp trace.TracerProvider, mp otelmetric.MeterProvider, p propagation.TextMapPropagator) (*gitlab.Client, error) { +func (a *ConfiguredApp) constructGitLabClient(dt trace.Tracer, dm otelmetric.Meter, + tp trace.TracerProvider, mp otelmetric.MeterProvider, p propagation.TextMapPropagator) (*gitlab.Client, error) { cfg := a.Configuration gitLabUrl, err := url.Parse(cfg.Gitlab.Address) @@ -447,6 +451,22 @@ func (a *ConfiguredApp) constructGitLabClient(tp trace.TracerProvider, mp otelme if err != nil { return nil, fmt.Errorf("authentication secret: %w", err) } + var limiter httpz.Limiter + limiter = rate.NewLimiter( + rate.Limit(cfg.Gitlab.ApiRateLimit.RefillRatePerSecond), + int(cfg.Gitlab.ApiRateLimit.BucketSize), + ) + limiter, err = metric.NewWaitLimiterInstrumentation( + "gitlab_client", + cfg.Gitlab.ApiRateLimit.RefillRatePerSecond, + "{refill/s}", + dt, + dm, + limiter, + ) + if err != nil { + return nil, err + } return gitlab.NewClient( gitLabUrl, decodedAuthSecret, @@ -455,20 +475,30 @@ func (a *ConfiguredApp) constructGitLabClient(tp trace.TracerProvider, mp otelme gitlab.WithMeterProvider(mp), gitlab.WithUserAgent(kasServerName()), gitlab.WithTLSConfig(clientTLSConfig), - gitlab.WithRateLimiter(rate.NewLimiter( - rate.Limit(cfg.Gitlab.ApiRateLimit.RefillRatePerSecond), - int(cfg.Gitlab.ApiRateLimit.BucketSize), - )), + gitlab.WithRateLimiter(limiter), ), nil } -func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, tp trace.TracerProvider, mp otelmetric.MeterProvider, - p propagation.TextMapPropagator, streamClientProm grpc.StreamClientInterceptor, unaryClientProm grpc.UnaryClientInterceptor) *client.Pool { +func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, dt trace.Tracer, dm otelmetric.Meter, + tp trace.TracerProvider, mp otelmetric.MeterProvider, + p propagation.TextMapPropagator, streamClientProm grpc.StreamClientInterceptor, unaryClientProm grpc.UnaryClientInterceptor) (*client.Pool, error) { g := a.Configuration.Gitaly - globalGitalyRpcLimiter := rate.NewLimiter( + var globalGitalyRpcLimiter grpctool.ClientLimiter + globalGitalyRpcLimiter = rate.NewLimiter( rate.Limit(g.GlobalApiRateLimit.RefillRatePerSecond), int(g.GlobalApiRateLimit.BucketSize), ) + globalGitalyRpcLimiter, err := metric.NewWaitLimiterInstrumentation( + "gitaly_client_global", + g.GlobalApiRateLimit.RefillRatePerSecond, + "{refill/s}", + dt, + dm, + globalGitalyRpcLimiter, + ) + if err != nil { + return nil, err + } return client.NewPoolWithOptions( client.WithDialOptions( grpc.WithUserAgent(kasServerName()), @@ -492,9 +522,21 @@ func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, tp trace.TracerPr // Don't put interceptors here as order is important. Put them below. ), client.WithDialer(func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) { - perServerGitalyRpcLimiter := rate.NewLimiter( + var perServerGitalyRpcLimiter grpctool.ClientLimiter + perServerGitalyRpcLimiter = rate.NewLimiter( rate.Limit(g.PerServerApiRateLimit.RefillRatePerSecond), int(g.PerServerApiRateLimit.BucketSize)) + perServerGitalyRpcLimiter, err := metric.NewWaitLimiterInstrumentation( + "gitaly_client_"+address, + g.GlobalApiRateLimit.RefillRatePerSecond, + "{refill/s}", + dt, + dm, + perServerGitalyRpcLimiter, + ) + if err != nil { + return nil, err + } opts := []grpc.DialOption{ grpc.WithChainStreamInterceptor( streamClientProm, @@ -510,7 +552,7 @@ func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, tp trace.TracerPr opts = append(opts, dialOptions...) return client.DialContext(ctx, address, opts) }), - ) + ), nil } func (a *ConfiguredApp) constructRedisClient(tp trace.TracerProvider, mp otelmetric.MeterProvider) (rueidis.Client, error) { @@ -715,19 +757,19 @@ func constructOTELResource() (*resource.Resource, error) { resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNameKey.String(kasName), - semconv.ServiceVersionKey.String(cmd.Version), + semconv.ServiceName(kasName), + semconv.ServiceVersion(cmd.Version), ), ) } func gitlabBuildInfoGauge(m otelmetric.Meter) error { + // Only allocate the option once + attributes := otelmetric.WithAttributeSet(attribute.NewSet(kasVersionAttr.String(cmd.Version), kasBuiltAttr.String(cmd.BuildTime))) _, err := m.Int64ObservableGauge(gitlabBuildInfoGaugeMetricName, otelmetric.WithDescription("Current build info for this GitLab Service"), otelmetric.WithInt64Callback(func(ctx context.Context, observer otelmetric.Int64Observer) error { - observer.Observe(1, - otelmetric.WithAttributes(kasVersionAttr.String(cmd.Version), kasBuiltAttr.String(cmd.BuildTime)), - ) + observer.Observe(1, attributes) return nil }), ) diff --git a/internal/tool/metric/BUILD.bazel b/internal/tool/metric/BUILD.bazel index 5fe5c36c..90e6292b 100644 --- a/internal/tool/metric/BUILD.bazel +++ b/internal/tool/metric/BUILD.bazel @@ -3,14 +3,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "metric", srcs = [ + "allow_limiter.go", "counter.go", + "limiter_wrapper.go", "metric.go", + "wait_limiter.go", ], importpath = "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/metric", visibility = ["//:__subpackages__"], deps = [ "//internal/tool/logz", "@com_github_prometheus_client_golang//prometheus", + "@io_opentelemetry_go_otel//attribute", + "@io_opentelemetry_go_otel//codes", + "@io_opentelemetry_go_otel_metric//:metric", + "@io_opentelemetry_go_otel_trace//:trace", "@org_uber_go_zap//:zap", ], ) diff --git a/internal/tool/metric/allow_limiter.go b/internal/tool/metric/allow_limiter.go new file mode 100644 index 00000000..148bb22d --- /dev/null +++ b/internal/tool/metric/allow_limiter.go @@ -0,0 +1,40 @@ +package metric + +import ( + "context" + + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +var ( + _ AllowLimiter = (*AllowLimiterInstrumentation)(nil) +) + +type AllowLimiter interface { + Allow(context.Context) bool +} + +type AllowLimiterInstrumentation struct { + wrapper *LimiterWrapper + delegate AllowLimiter +} + +func NewAllowLimiterInstrumentation(limiterName string, limit float64, limitUnit string, tr trace.Tracer, m otelmetric.Meter, delegate AllowLimiter) (*AllowLimiterInstrumentation, error) { + w, err := NewLimiterWrapper(limiterName, limit, limitUnit, m, tr) + if err != nil { + return nil, err + } + return &AllowLimiterInstrumentation{ + wrapper: w, + delegate: delegate, + }, nil +} + +func (i *AllowLimiterInstrumentation) Allow(ctx context.Context) (allowed bool) { + ctx, done := i.wrapper.Start(ctx) + defer func() { // to handle panics + done(allowed) + }() + return i.delegate.Allow(ctx) +} diff --git a/internal/tool/metric/limiter_wrapper.go b/internal/tool/metric/limiter_wrapper.go new file mode 100644 index 00000000..9e9a64d4 --- /dev/null +++ b/internal/tool/metric/limiter_wrapper.go @@ -0,0 +1,80 @@ +package metric + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +const ( + rateLimiterBlockDurationName = "limiter_block_duration" + rateLimiterLimitName = "limiter_limit" + allowedAttr attribute.Key = "allowed" + limiterNameAttr attribute.Key = "limiter_name" + limiterLimitUnitAttr attribute.Key = "unit" +) + +type LimiterWrapper struct { + limiterName string + tr trace.Tracer + hist otelmetric.Float64Histogram +} + +func NewLimiterWrapper(limiterName string, limit float64, limitUnit string, m otelmetric.Meter, tr trace.Tracer) (*LimiterWrapper, error) { + limitAttrs := otelmetric.WithAttributeSet(attribute.NewSet( + limiterNameAttr.String(limiterName), + limiterLimitUnitAttr.String(limitUnit), + )) // allocate once + _, err := m.Float64ObservableGauge( + rateLimiterLimitName, + otelmetric.WithDescription("Limit for the rate limiter"), + otelmetric.WithUnit(limitUnit), + otelmetric.WithFloat64Callback(func(ctx context.Context, observer otelmetric.Float64Observer) error { + observer.Observe(limit, limitAttrs) + return nil + }), + ) + if err != nil { + return nil, err + } + hist, err := m.Float64Histogram( + rateLimiterBlockDurationName, + // TODO switch to "seconds" once API to set histogram's buckets is available + // See https://github.com/open-telemetry/opentelemetry-go/issues/4094 + otelmetric.WithUnit("ms"), + otelmetric.WithDescription("Duration the rate limiter blocked for deciding to allow/block the call"), + ) + if err != nil { + return nil, err + } + return &LimiterWrapper{ + limiterName: limiterName, + tr: tr, + hist: hist, + }, nil +} + +func (w *LimiterWrapper) Start(ctx context.Context) (context.Context, func(allowed bool)) { + start := time.Now() + ctx, span := w.tr.Start(ctx, "limiter", trace.WithSpanKind(trace.SpanKindInternal)) + return ctx, func(allowed bool) { + // TODO switch to "seconds" once API to set histogram's buckets is available + // See https://github.com/open-telemetry/opentelemetry-go/issues/4094 + duration := float64(time.Since(start)) / float64(time.Millisecond) + // Pass background context because we always want to record the duration. + w.hist.Record(context.Background(), duration, otelmetric.WithAttributeSet(attribute.NewSet( + allowedAttr.Bool(allowed), + limiterNameAttr.String(w.limiterName), + ))) + if allowed { + span.SetStatus(codes.Ok, "") + } else { + span.SetStatus(codes.Error, "") + } + span.End() + } +} diff --git a/internal/tool/metric/wait_limiter.go b/internal/tool/metric/wait_limiter.go new file mode 100644 index 00000000..020cb858 --- /dev/null +++ b/internal/tool/metric/wait_limiter.go @@ -0,0 +1,43 @@ +package metric + +import ( + "context" + + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +var ( + _ WaitLimiter = (*WaitLimiterInstrumentation)(nil) +) + +type WaitLimiter interface { + Wait(context.Context) error +} + +type WaitLimiterInstrumentation struct { + wrapper *LimiterWrapper + delegate WaitLimiter +} + +func NewWaitLimiterInstrumentation(limiterName string, limit float64, limitUnit string, tr trace.Tracer, m otelmetric.Meter, delegate WaitLimiter) (*WaitLimiterInstrumentation, error) { + w, err := NewLimiterWrapper(limiterName, limit, limitUnit, m, tr) + if err != nil { + return nil, err + } + return &WaitLimiterInstrumentation{ + wrapper: w, + delegate: delegate, + }, nil +} + +func (i *WaitLimiterInstrumentation) Wait(ctx context.Context) error { + ctx, done := i.wrapper.Start(ctx) + allowed := false + defer func() { // to handle panics + done(allowed) + }() + err := i.delegate.Wait(ctx) + allowed = err == nil + return err +}