Skip to content

Commit

Permalink
[raft] report partition size and file system size (#7215)
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 authored Aug 9, 2024
1 parent e96062d commit 501308e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
4 changes: 4 additions & 0 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ func (s *Store) Sample(ctx context.Context, rangeID uint64, partition string, n
return r.Sample(ctx, partition, n)
}

func (s *Store) GetRootDir() string {
return s.rootDir
}

func (s *Store) replicaForRange(rangeID uint64) (*replica.Replica, *rfpb.RangeDescriptor, error) {
s.rangeMu.RLock()
rd, rangeOK := s.openRanges[rangeID]
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/raft/usagetracker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//server/util/proto",
"//server/util/status",
"@com_github_docker_go_units//:go-units",
"@com_github_elastic_gosigar//:gosigar",
"@com_github_hashicorp_serf//serf",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_x_sync//errgroup",
Expand Down
56 changes: 50 additions & 6 deletions enterprise/server/raft/usagetracker/usagetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/elastic/gosigar"

"github.com/docker/go-units"
"github.com/hashicorp/serf/serf"
Expand All @@ -36,6 +37,7 @@ var (
partitionUsageDeltaGossipThreshold = flag.Int("cache.raft.partition_usage_delta_bytes_threshold", 100e6, "Gossip partition usage information if it has changed by more than this amount since the last gossip.")
samplesPerEviction = flag.Int("cache.raft.samples_per_eviction", 20, "How many records to sample on each eviction")
samplePoolSize = flag.Int("cache.raft.sample_pool_size", 500, "How many deletion candidates to maintain between evictions")
metricsRefreshPeriod = 30 * time.Second
)

const (
Expand All @@ -54,11 +56,15 @@ const (

// How old store partition usage data can be before we consider it invalid.
storePartitionStalenessLimit = storePartitionUsageMaxAge * 2

cacheName = "raft"
)

type IStore interface {
Sender() *sender.Sender
Sample(ctx context.Context, rangeID uint64, partition string, n int) ([]*approxlru.Sample[*replica.LRUSample], error)
GetRootDir() string
NHID() string
}

type Tracker struct {
Expand All @@ -83,7 +89,7 @@ type nodePartitionUsage struct {
}

type partitionUsage struct {
id string
part disk.Partition
store IStore

mu sync.Mutex
Expand Down Expand Up @@ -156,7 +162,7 @@ func (pu *partitionUsage) evict(ctx context.Context, sample *approxlru.Sample[*r
}

ageMillis := float64(time.Since(sample.Timestamp).Milliseconds())
metrics.RaftEvictionAgeMsec.With(prometheus.Labels{metrics.PartitionID: pu.id}).Observe(ageMillis)
metrics.RaftEvictionAgeMsec.With(prometheus.Labels{metrics.PartitionID: pu.part.ID}).Observe(ageMillis)

globalSizeBytes := pu.GlobalSizeBytes()

Expand Down Expand Up @@ -199,7 +205,7 @@ func (pu *partitionUsage) sample(ctx context.Context, n int) ([]*approxlru.Sampl

if totalCount <= 0 {
if totalCount < 0 {
log.Warningf("partitionUsage (id=%s) TotalCount (%d) is negative", pu.id, totalCount)
log.Warningf("partitionUsage (id=%s) TotalCount (%d) is negative", pu.part.ID, totalCount)
}
return nil, status.FailedPreconditionError("cannot sample empty partition")
}
Expand All @@ -211,9 +217,9 @@ func (pu *partitionUsage) sample(ctx context.Context, n int) ([]*approxlru.Sampl
for rangeID, u := range pu.replicas {
count += u.GetTotalCount()
if rn < count {
ps, err := pu.store.Sample(ctx, rangeID, pu.id, 1)
ps, err := pu.store.Sample(ctx, rangeID, pu.part.ID, 1)
if err != nil {
return nil, status.InternalErrorf("could not sample partition %q: %s", pu.id, err)
return nil, status.InternalErrorf("could not sample partition %q: %s", pu.part.ID, err)
}
samples = append(samples, ps...)
break
Expand All @@ -223,6 +229,15 @@ func (pu *partitionUsage) sample(ctx context.Context, n int) ([]*approxlru.Sampl
return samples, nil
}

func (pu *partitionUsage) updateMetrics() {
globalSizeBytes := pu.GlobalSizeBytes()

lbls := prometheus.Labels{metrics.PartitionID: pu.part.ID, metrics.CacheNameLabel: cacheName}

metrics.DiskCachePartitionSizeBytes.With(lbls).Set(float64(globalSizeBytes))
metrics.DiskCachePartitionCapacityBytes.With(lbls).Set(float64(pu.part.MaxSizeBytes))
}

func New(store IStore, gossipManager interfaces.GossipService, node *rfpb.NodeDescriptor, partitions []disk.Partition, events <-chan events.Event) (*Tracker, error) {
ut := &Tracker{
store: store,
Expand All @@ -237,7 +252,7 @@ func New(store IStore, gossipManager interfaces.GossipService, node *rfpb.NodeDe

for _, p := range partitions {
u := &partitionUsage{
id: p.ID,
part: p,
store: store,
nodes: make(map[string]*nodePartitionUsage),
replicas: make(map[uint64]*rfpb.PartitionMetadata),
Expand Down Expand Up @@ -278,6 +293,11 @@ func New(store IStore, gossipManager interfaces.GossipService, node *rfpb.NodeDe
return nil
})

eg.Go(func() error {
ut.refreshMetrics(gctx)
return nil
})

gossipManager.AddListener(ut)
return ut, nil
}
Expand Down Expand Up @@ -547,3 +567,27 @@ func (ut *Tracker) broadcast(force bool) error {

return nil
}

func (ut *Tracker) refreshMetrics(ctx context.Context) {
ticker := time.NewTicker(metricsRefreshPeriod)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fsu := gosigar.FileSystemUsage{}
if err := fsu.Get(ut.store.GetRootDir()); err != nil {
log.Warningf("[%s] could not retrieve filesystem stats: %s", ut.store.NHID(), err)
} else {
metrics.DiskCacheFilesystemTotalBytes.With(prometheus.Labels{metrics.CacheNameLabel: cacheName}).Set(float64(fsu.Total))
metrics.DiskCacheFilesystemAvailBytes.With(prometheus.Labels{metrics.CacheNameLabel: cacheName}).Set(float64(fsu.Avail))
}

ut.mu.Lock()
for _, pu := range ut.byPartition {
pu.updateMetrics()
}
ut.mu.Unlock()
}
}
}

0 comments on commit 501308e

Please sign in to comment.