Skip to content

Commit

Permalink
Merge branch 'ash2k/limiter' into 'master'
Browse files Browse the repository at this point in the history
Rate limiter instrumentation

Closes #154

See merge request https://gitlab.com/gitlab-org/cluster-integration/gitlab-agent/-/merge_requests/1149

Merged-by: Mikhail Mazurskiy <[email protected]>
Approved-by: Taka Nishida <[email protected]>
  • Loading branch information
ash2k committed Oct 16, 2023
2 parents 0e29f76 + da8f137 commit 8daa08c
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 20 deletions.
17 changes: 15 additions & 2 deletions cmd/kas/kasapp/app_agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/httpz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/metric"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/redistool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/tlstool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/wstunnel"
Expand Down Expand Up @@ -47,7 +48,7 @@ type agentServer struct {
ready func()
}

func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modserver.Api, dt trace.Tracer,
func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modserver.Api, dt trace.Tracer, dm otelmetric.Meter,
tp trace.TracerProvider, mp otelmetric.MeterProvider, redisClient rueidis.Client, ssh stats.Handler, factory modserver.AgentRpcApiFactory,
ownPrivateApiUrl string, probeRegistry *observability.ProbeRegistry, reg *prometheus.Registry,
streamProm grpc.StreamServerInterceptor, unaryProm grpc.UnaryServerInterceptor,
Expand Down Expand Up @@ -77,7 +78,8 @@ func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modse
if err != nil {
return nil, err
}
agentConnectionLimiter := redistool.NewTokenLimiter(
var agentConnectionLimiter grpctool.ServerLimiter
agentConnectionLimiter = redistool.NewTokenLimiter(
redisClient,
cfg.Redis.KeyPrefix+":agent_limit",
uint64(listenCfg.ConnectionsPerTokenPerMinute),
Expand All @@ -88,6 +90,17 @@ func newAgentServer(log *zap.Logger, cfg *kascfg.ConfigurationFile, srvApi modse
}
},
)
agentConnectionLimiter, err = metric.NewAllowLimiterInstrumentation(
"agent_connection",
float64(listenCfg.ConnectionsPerTokenPerMinute),
"{connection/token/m}",
dt,
dm,
agentConnectionLimiter,
)
if err != nil {
return nil, err
}
auxCtx, auxCancel := context.WithCancel(context.Background())
traceContextProp := propagation.TraceContext{} // only want trace id, not baggage from external clients/agents
keepaliveOpt, sh := grpctool.MaxConnectionAge2GrpcKeepalive(auxCtx, listenCfg.MaxConnectionAge.AsDuration())
Expand Down
78 changes: 60 additions & 18 deletions cmd/kas/kasapp/configured_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (a *ConfiguredApp) Run(ctx context.Context) (retErr error) {
dt := tp.Tracer(kasTracerName) // defaultTracer

// GitLab REST client
gitLabClient, err := a.constructGitLabClient(tp, mp, p)
gitLabClient, err := a.constructGitLabClient(dt, dm, tp, mp, p)
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (a *ConfiguredApp) Run(ctx context.Context) (retErr error) {
}

// Server for handling agentk requests
agentSrv, err := newAgentServer(a.Log, a.Configuration, srvApi, dt, tp, mp, redisClient, ssh, agentRpcApiFactory, // nolint: contextcheck
agentSrv, err := newAgentServer(a.Log, a.Configuration, srvApi, dt, dm, tp, mp, redisClient, ssh, agentRpcApiFactory, // nolint: contextcheck
privateApiSrv.ownUrl, probeRegistry, reg, streamProm, unaryProm, grpcServerErrorReporter)
if err != nil {
return fmt.Errorf("agent server: %w", err)
Expand Down Expand Up @@ -236,7 +236,10 @@ func (a *ConfiguredApp) Run(ctx context.Context) (retErr error) {
usageTracker := usage_metrics.NewUsageTracker()

// Gitaly client
gitalyClientPool := a.constructGitalyPool(csh, tp, mp, p, streamClientProm, unaryClientProm)
gitalyClientPool, err := a.constructGitalyPool(csh, dt, dm, tp, mp, p, streamClientProm, unaryClientProm)
if err != nil {
return err
}
defer errz.SafeClose(gitalyClientPool, &retErr)

// Module factories
Expand Down Expand Up @@ -430,7 +433,8 @@ func (a *ConfiguredApp) loadGitLabClientAuthSecret() ([]byte, error) {
return decodedAuthSecret, nil
}

func (a *ConfiguredApp) constructGitLabClient(tp trace.TracerProvider, mp otelmetric.MeterProvider, p propagation.TextMapPropagator) (*gitlab.Client, error) {
func (a *ConfiguredApp) constructGitLabClient(dt trace.Tracer, dm otelmetric.Meter,
tp trace.TracerProvider, mp otelmetric.MeterProvider, p propagation.TextMapPropagator) (*gitlab.Client, error) {
cfg := a.Configuration

gitLabUrl, err := url.Parse(cfg.Gitlab.Address)
Expand All @@ -447,6 +451,22 @@ func (a *ConfiguredApp) constructGitLabClient(tp trace.TracerProvider, mp otelme
if err != nil {
return nil, fmt.Errorf("authentication secret: %w", err)
}
var limiter httpz.Limiter
limiter = rate.NewLimiter(
rate.Limit(cfg.Gitlab.ApiRateLimit.RefillRatePerSecond),
int(cfg.Gitlab.ApiRateLimit.BucketSize),
)
limiter, err = metric.NewWaitLimiterInstrumentation(
"gitlab_client",
cfg.Gitlab.ApiRateLimit.RefillRatePerSecond,
"{refill/s}",
dt,
dm,
limiter,
)
if err != nil {
return nil, err
}
return gitlab.NewClient(
gitLabUrl,
decodedAuthSecret,
Expand All @@ -455,20 +475,30 @@ func (a *ConfiguredApp) constructGitLabClient(tp trace.TracerProvider, mp otelme
gitlab.WithMeterProvider(mp),
gitlab.WithUserAgent(kasServerName()),
gitlab.WithTLSConfig(clientTLSConfig),
gitlab.WithRateLimiter(rate.NewLimiter(
rate.Limit(cfg.Gitlab.ApiRateLimit.RefillRatePerSecond),
int(cfg.Gitlab.ApiRateLimit.BucketSize),
)),
gitlab.WithRateLimiter(limiter),
), nil
}

func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, tp trace.TracerProvider, mp otelmetric.MeterProvider,
p propagation.TextMapPropagator, streamClientProm grpc.StreamClientInterceptor, unaryClientProm grpc.UnaryClientInterceptor) *client.Pool {
func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, dt trace.Tracer, dm otelmetric.Meter,
tp trace.TracerProvider, mp otelmetric.MeterProvider,
p propagation.TextMapPropagator, streamClientProm grpc.StreamClientInterceptor, unaryClientProm grpc.UnaryClientInterceptor) (*client.Pool, error) {
g := a.Configuration.Gitaly
globalGitalyRpcLimiter := rate.NewLimiter(
var globalGitalyRpcLimiter grpctool.ClientLimiter
globalGitalyRpcLimiter = rate.NewLimiter(
rate.Limit(g.GlobalApiRateLimit.RefillRatePerSecond),
int(g.GlobalApiRateLimit.BucketSize),
)
globalGitalyRpcLimiter, err := metric.NewWaitLimiterInstrumentation(
"gitaly_client_global",
g.GlobalApiRateLimit.RefillRatePerSecond,
"{refill/s}",
dt,
dm,
globalGitalyRpcLimiter,
)
if err != nil {
return nil, err
}
return client.NewPoolWithOptions(
client.WithDialOptions(
grpc.WithUserAgent(kasServerName()),
Expand All @@ -492,9 +522,21 @@ func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, tp trace.TracerPr
// Don't put interceptors here as order is important. Put them below.
),
client.WithDialer(func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) {
perServerGitalyRpcLimiter := rate.NewLimiter(
var perServerGitalyRpcLimiter grpctool.ClientLimiter
perServerGitalyRpcLimiter = rate.NewLimiter(
rate.Limit(g.PerServerApiRateLimit.RefillRatePerSecond),
int(g.PerServerApiRateLimit.BucketSize))
perServerGitalyRpcLimiter, err := metric.NewWaitLimiterInstrumentation(
"gitaly_client_"+address,
g.GlobalApiRateLimit.RefillRatePerSecond,
"{refill/s}",
dt,
dm,
perServerGitalyRpcLimiter,
)
if err != nil {
return nil, err
}
opts := []grpc.DialOption{
grpc.WithChainStreamInterceptor(
streamClientProm,
Expand All @@ -510,7 +552,7 @@ func (a *ConfiguredApp) constructGitalyPool(csh stats.Handler, tp trace.TracerPr
opts = append(opts, dialOptions...)
return client.DialContext(ctx, address, opts)
}),
)
), nil
}

func (a *ConfiguredApp) constructRedisClient(tp trace.TracerProvider, mp otelmetric.MeterProvider) (rueidis.Client, error) {
Expand Down Expand Up @@ -715,19 +757,19 @@ func constructOTELResource() (*resource.Resource, error) {
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(kasName),
semconv.ServiceVersionKey.String(cmd.Version),
semconv.ServiceName(kasName),
semconv.ServiceVersion(cmd.Version),
),
)
}

func gitlabBuildInfoGauge(m otelmetric.Meter) error {
// Only allocate the option once
attributes := otelmetric.WithAttributeSet(attribute.NewSet(kasVersionAttr.String(cmd.Version), kasBuiltAttr.String(cmd.BuildTime)))
_, err := m.Int64ObservableGauge(gitlabBuildInfoGaugeMetricName,
otelmetric.WithDescription("Current build info for this GitLab Service"),
otelmetric.WithInt64Callback(func(ctx context.Context, observer otelmetric.Int64Observer) error {
observer.Observe(1,
otelmetric.WithAttributes(kasVersionAttr.String(cmd.Version), kasBuiltAttr.String(cmd.BuildTime)),
)
observer.Observe(1, attributes)
return nil
}),
)
Expand Down
7 changes: 7 additions & 0 deletions internal/tool/metric/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "metric",
srcs = [
"allow_limiter.go",
"counter.go",
"limiter_wrapper.go",
"metric.go",
"wait_limiter.go",
],
importpath = "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/metric",
visibility = ["//:__subpackages__"],
deps = [
"//internal/tool/logz",
"@com_github_prometheus_client_golang//prometheus",
"@io_opentelemetry_go_otel//attribute",
"@io_opentelemetry_go_otel//codes",
"@io_opentelemetry_go_otel_metric//:metric",
"@io_opentelemetry_go_otel_trace//:trace",
"@org_uber_go_zap//:zap",
],
)
40 changes: 40 additions & 0 deletions internal/tool/metric/allow_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package metric

import (
"context"

otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

var (
_ AllowLimiter = (*AllowLimiterInstrumentation)(nil)
)

type AllowLimiter interface {
Allow(context.Context) bool
}

type AllowLimiterInstrumentation struct {
wrapper *LimiterWrapper
delegate AllowLimiter
}

func NewAllowLimiterInstrumentation(limiterName string, limit float64, limitUnit string, tr trace.Tracer, m otelmetric.Meter, delegate AllowLimiter) (*AllowLimiterInstrumentation, error) {
w, err := NewLimiterWrapper(limiterName, limit, limitUnit, m, tr)
if err != nil {
return nil, err
}
return &AllowLimiterInstrumentation{
wrapper: w,
delegate: delegate,
}, nil
}

func (i *AllowLimiterInstrumentation) Allow(ctx context.Context) (allowed bool) {
ctx, done := i.wrapper.Start(ctx)
defer func() { // to handle panics
done(allowed)
}()
return i.delegate.Allow(ctx)
}
80 changes: 80 additions & 0 deletions internal/tool/metric/limiter_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package metric

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

const (
rateLimiterBlockDurationName = "limiter_block_duration"
rateLimiterLimitName = "limiter_limit"
allowedAttr attribute.Key = "allowed"
limiterNameAttr attribute.Key = "limiter_name"
limiterLimitUnitAttr attribute.Key = "unit"
)

type LimiterWrapper struct {
limiterName string
tr trace.Tracer
hist otelmetric.Float64Histogram
}

func NewLimiterWrapper(limiterName string, limit float64, limitUnit string, m otelmetric.Meter, tr trace.Tracer) (*LimiterWrapper, error) {
limitAttrs := otelmetric.WithAttributeSet(attribute.NewSet(
limiterNameAttr.String(limiterName),
limiterLimitUnitAttr.String(limitUnit),
)) // allocate once
_, err := m.Float64ObservableGauge(
rateLimiterLimitName,
otelmetric.WithDescription("Limit for the rate limiter"),
otelmetric.WithUnit(limitUnit),
otelmetric.WithFloat64Callback(func(ctx context.Context, observer otelmetric.Float64Observer) error {
observer.Observe(limit, limitAttrs)
return nil
}),
)
if err != nil {
return nil, err
}
hist, err := m.Float64Histogram(
rateLimiterBlockDurationName,
// TODO switch to "seconds" once API to set histogram's buckets is available
// See https://github.com/open-telemetry/opentelemetry-go/issues/4094
otelmetric.WithUnit("ms"),
otelmetric.WithDescription("Duration the rate limiter blocked for deciding to allow/block the call"),
)
if err != nil {
return nil, err
}
return &LimiterWrapper{
limiterName: limiterName,
tr: tr,
hist: hist,
}, nil
}

func (w *LimiterWrapper) Start(ctx context.Context) (context.Context, func(allowed bool)) {
start := time.Now()
ctx, span := w.tr.Start(ctx, "limiter", trace.WithSpanKind(trace.SpanKindInternal))
return ctx, func(allowed bool) {
// TODO switch to "seconds" once API to set histogram's buckets is available
// See https://github.com/open-telemetry/opentelemetry-go/issues/4094
duration := float64(time.Since(start)) / float64(time.Millisecond)
// Pass background context because we always want to record the duration.
w.hist.Record(context.Background(), duration, otelmetric.WithAttributeSet(attribute.NewSet(
allowedAttr.Bool(allowed),
limiterNameAttr.String(w.limiterName),
)))
if allowed {
span.SetStatus(codes.Ok, "")
} else {
span.SetStatus(codes.Error, "")
}
span.End()
}
}
43 changes: 43 additions & 0 deletions internal/tool/metric/wait_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package metric

import (
"context"

otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

var (
_ WaitLimiter = (*WaitLimiterInstrumentation)(nil)
)

type WaitLimiter interface {
Wait(context.Context) error
}

type WaitLimiterInstrumentation struct {
wrapper *LimiterWrapper
delegate WaitLimiter
}

func NewWaitLimiterInstrumentation(limiterName string, limit float64, limitUnit string, tr trace.Tracer, m otelmetric.Meter, delegate WaitLimiter) (*WaitLimiterInstrumentation, error) {
w, err := NewLimiterWrapper(limiterName, limit, limitUnit, m, tr)
if err != nil {
return nil, err
}
return &WaitLimiterInstrumentation{
wrapper: w,
delegate: delegate,
}, nil
}

func (i *WaitLimiterInstrumentation) Wait(ctx context.Context) error {
ctx, done := i.wrapper.Start(ctx)
allowed := false
defer func() { // to handle panics
done(allowed)
}()
err := i.delegate.Wait(ctx)
allowed = err == nil
return err
}

0 comments on commit 8daa08c

Please sign in to comment.