From 501308e8a27202752f2f1d59edcbe10fce99d0bf Mon Sep 17 00:00:00 2001 From: Lulu Zhang Date: Fri, 9 Aug 2024 16:17:20 -0700 Subject: [PATCH] [raft] report partition size and file system size (#7215) --- enterprise/server/raft/store/store.go | 4 ++ enterprise/server/raft/usagetracker/BUILD | 1 + .../server/raft/usagetracker/usagetracker.go | 56 +++++++++++++++++-- 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/enterprise/server/raft/store/store.go b/enterprise/server/raft/store/store.go index a4d3dbe8029..27f87e2dbce 100644 --- a/enterprise/server/raft/store/store.go +++ b/enterprise/server/raft/store/store.go @@ -654,6 +654,10 @@ func (s *Store) Sample(ctx context.Context, rangeID uint64, partition string, n return r.Sample(ctx, partition, n) } +func (s *Store) GetRootDir() string { + return s.rootDir +} + func (s *Store) replicaForRange(rangeID uint64) (*replica.Replica, *rfpb.RangeDescriptor, error) { s.rangeMu.RLock() rd, rangeOK := s.openRanges[rangeID] diff --git a/enterprise/server/raft/usagetracker/BUILD b/enterprise/server/raft/usagetracker/BUILD index 41e9ab79508..a09e252b223 100644 --- a/enterprise/server/raft/usagetracker/BUILD +++ b/enterprise/server/raft/usagetracker/BUILD @@ -20,6 +20,7 @@ go_library( "//server/util/proto", "//server/util/status", "@com_github_docker_go_units//:go-units", + "@com_github_elastic_gosigar//:gosigar", "@com_github_hashicorp_serf//serf", "@com_github_prometheus_client_golang//prometheus", "@org_golang_x_sync//errgroup", diff --git a/enterprise/server/raft/usagetracker/usagetracker.go b/enterprise/server/raft/usagetracker/usagetracker.go index 6a84792e6b1..3ce92f8bf55 100644 --- a/enterprise/server/raft/usagetracker/usagetracker.go +++ b/enterprise/server/raft/usagetracker/usagetracker.go @@ -23,6 +23,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/proto" "github.com/buildbuddy-io/buildbuddy/server/util/status" + "github.com/elastic/gosigar" "github.com/docker/go-units" "github.com/hashicorp/serf/serf" @@ -36,6 +37,7 @@ var ( partitionUsageDeltaGossipThreshold = flag.Int("cache.raft.partition_usage_delta_bytes_threshold", 100e6, "Gossip partition usage information if it has changed by more than this amount since the last gossip.") samplesPerEviction = flag.Int("cache.raft.samples_per_eviction", 20, "How many records to sample on each eviction") samplePoolSize = flag.Int("cache.raft.sample_pool_size", 500, "How many deletion candidates to maintain between evictions") + metricsRefreshPeriod = 30 * time.Second ) const ( @@ -54,11 +56,15 @@ const ( // How old store partition usage data can be before we consider it invalid. storePartitionStalenessLimit = storePartitionUsageMaxAge * 2 + + cacheName = "raft" ) type IStore interface { Sender() *sender.Sender Sample(ctx context.Context, rangeID uint64, partition string, n int) ([]*approxlru.Sample[*replica.LRUSample], error) + GetRootDir() string + NHID() string } type Tracker struct { @@ -83,7 +89,7 @@ type nodePartitionUsage struct { } type partitionUsage struct { - id string + part disk.Partition store IStore mu sync.Mutex @@ -156,7 +162,7 @@ func (pu *partitionUsage) evict(ctx context.Context, sample *approxlru.Sample[*r } ageMillis := float64(time.Since(sample.Timestamp).Milliseconds()) - metrics.RaftEvictionAgeMsec.With(prometheus.Labels{metrics.PartitionID: pu.id}).Observe(ageMillis) + metrics.RaftEvictionAgeMsec.With(prometheus.Labels{metrics.PartitionID: pu.part.ID}).Observe(ageMillis) globalSizeBytes := pu.GlobalSizeBytes() @@ -199,7 +205,7 @@ func (pu *partitionUsage) sample(ctx context.Context, n int) ([]*approxlru.Sampl if totalCount <= 0 { if totalCount < 0 { - log.Warningf("partitionUsage (id=%s) TotalCount (%d) is negative", pu.id, totalCount) + log.Warningf("partitionUsage (id=%s) TotalCount (%d) is negative", pu.part.ID, totalCount) } return nil, status.FailedPreconditionError("cannot sample empty partition") } @@ -211,9 +217,9 @@ func (pu *partitionUsage) sample(ctx context.Context, n int) ([]*approxlru.Sampl for rangeID, u := range pu.replicas { count += u.GetTotalCount() if rn < count { - ps, err := pu.store.Sample(ctx, rangeID, pu.id, 1) + ps, err := pu.store.Sample(ctx, rangeID, pu.part.ID, 1) if err != nil { - return nil, status.InternalErrorf("could not sample partition %q: %s", pu.id, err) + return nil, status.InternalErrorf("could not sample partition %q: %s", pu.part.ID, err) } samples = append(samples, ps...) break @@ -223,6 +229,15 @@ func (pu *partitionUsage) sample(ctx context.Context, n int) ([]*approxlru.Sampl return samples, nil } +func (pu *partitionUsage) updateMetrics() { + globalSizeBytes := pu.GlobalSizeBytes() + + lbls := prometheus.Labels{metrics.PartitionID: pu.part.ID, metrics.CacheNameLabel: cacheName} + + metrics.DiskCachePartitionSizeBytes.With(lbls).Set(float64(globalSizeBytes)) + metrics.DiskCachePartitionCapacityBytes.With(lbls).Set(float64(pu.part.MaxSizeBytes)) +} + func New(store IStore, gossipManager interfaces.GossipService, node *rfpb.NodeDescriptor, partitions []disk.Partition, events <-chan events.Event) (*Tracker, error) { ut := &Tracker{ store: store, @@ -237,7 +252,7 @@ func New(store IStore, gossipManager interfaces.GossipService, node *rfpb.NodeDe for _, p := range partitions { u := &partitionUsage{ - id: p.ID, + part: p, store: store, nodes: make(map[string]*nodePartitionUsage), replicas: make(map[uint64]*rfpb.PartitionMetadata), @@ -278,6 +293,11 @@ func New(store IStore, gossipManager interfaces.GossipService, node *rfpb.NodeDe return nil }) + eg.Go(func() error { + ut.refreshMetrics(gctx) + return nil + }) + gossipManager.AddListener(ut) return ut, nil } @@ -547,3 +567,27 @@ func (ut *Tracker) broadcast(force bool) error { return nil } + +func (ut *Tracker) refreshMetrics(ctx context.Context) { + ticker := time.NewTicker(metricsRefreshPeriod) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + fsu := gosigar.FileSystemUsage{} + if err := fsu.Get(ut.store.GetRootDir()); err != nil { + log.Warningf("[%s] could not retrieve filesystem stats: %s", ut.store.NHID(), err) + } else { + metrics.DiskCacheFilesystemTotalBytes.With(prometheus.Labels{metrics.CacheNameLabel: cacheName}).Set(float64(fsu.Total)) + metrics.DiskCacheFilesystemAvailBytes.With(prometheus.Labels{metrics.CacheNameLabel: cacheName}).Set(float64(fsu.Avail)) + } + + ut.mu.Lock() + for _, pu := range ut.byPartition { + pu.updateMetrics() + } + ut.mu.Unlock() + } + } +}