Skip to content

Commit

Permalink
Add cache metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 27, 2024
1 parent a2c05cb commit dfd2925
Show file tree
Hide file tree
Showing 15 changed files with 527 additions and 83 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ require (
github.com/docker/docker v25.0.5+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down
11 changes: 8 additions & 3 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func newBlobProvider(
blobStore *blobstore.BlobStore,
blobCacheSize uint64,
maxIOConcurrency int,
fetchTimeout time.Duration) (*blobProvider, error) {
fetchTimeout time.Duration,
metrics *cache.CacheAccessorMetrics) (*blobProvider, error) {

server := &blobProvider{
ctx: ctx,
Expand All @@ -42,9 +43,13 @@ func newBlobProvider(
fetchTimeout: fetchTimeout,
}

c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight)
cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](
computeBlobCacheWeight,
blobCacheSize,
maxIOConcurrency,
server.fetchBlob,
metrics)

cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions relay/blob_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestReadWrite(t *testing.T) {
blobStore,
1024*1024*32,
32,
10*time.Second)
10*time.Second,
nil)
require.NoError(t, err)

// Read the blobs back.
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestNonExistentBlob(t *testing.T) {
blobStore,
1024*1024*32,
32,
10*time.Second)
10*time.Second,
nil)
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand Down
77 changes: 70 additions & 7 deletions relay/cache/cache_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"golang.org/x/sync/semaphore"
"sync"
"time"
)

// CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses
Expand Down Expand Up @@ -47,6 +48,9 @@ type cacheAccessor[K comparable, V any] struct {
// cache is the underlying cache that this wrapper manages.
cache Cache[K, V]

// weightCalculator is the function used to calculate the weight of a key-value pair.
calculator WeightCalculator[K, V]

// concurrencyLimiter is a channel used to limit the number of concurrent lookups that can be in progress.
concurrencyLimiter chan struct{}

Expand All @@ -55,17 +59,29 @@ type cacheAccessor[K comparable, V any] struct {

// accessor is the function used to fetch values that are not in the cache.
accessor Accessor[K, V]

// insertionTimes is a map of keys to the time they were inserted into the cache. Used to calculate the average
// lifespan of items in the cache.
insertionTimes map[K]time.Time

// metrics is used to record metrics about the cache accessor's performance.
metrics *CacheAccessorMetrics
}

// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items
// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent
// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess
// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor
// parameter is the function used to fetch values that are not in the cache.
// NewCacheAccessor creates a new CacheAccessor.
//
// The concurrencyLimit parameter specifies the maximum number of concurrent lookups that can be in progress at any
// given time. If a greater number of lookups are requested, the excess lookups will block until a lookup completes.
// If concurrencyLimit is zero, then no limits are imposed. The accessor parameter is the function used to fetch values that are not in the cache.
//
// If metrics is not nil, it will be used to record metrics about the cache accessor's performance.
// If nil, no metrics will be recorded.
func NewCacheAccessor[K comparable, V any](
cache Cache[K, V],
calculator WeightCalculator[K, V],
maxWeight uint64,
concurrencyLimit int,
accessor Accessor[K, V]) (CacheAccessor[K, V], error) {
accessor Accessor[K, V],
metrics *CacheAccessorMetrics) (CacheAccessor[K, V], error) {

lookupsInProgress := make(map[K]*accessResult[V])

Expand All @@ -74,11 +90,30 @@ func NewCacheAccessor[K comparable, V any](
concurrencyLimiter = make(chan struct{}, concurrencyLimit)
}

insertionTimes := make(map[K]time.Time)
var evictionHandler func(K, V)
if metrics != nil {
// If metrics are enabled, track the amount of time each item spends in the cache.
// Thread safety is provided by the cacheLock.
evictionHandler = func(key K, _ V) {
if insertionTime, ok := insertionTimes[key]; ok {
lifespan := time.Since(insertionTime).Milliseconds()
metrics.averageLifespan.Update(float64(lifespan))
delete(insertionTimes, key)
}
}
}

cache := NewFIFOCache(maxWeight, calculator, evictionHandler)

return &cacheAccessor[K, V]{
cache: cache,
calculator: calculator,
concurrencyLimiter: concurrencyLimiter,
accessor: accessor,
lookupsInProgress: lookupsInProgress,
insertionTimes: insertionTimes,
metrics: metrics,
}, nil
}

Expand All @@ -97,6 +132,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
v, ok := c.cache.Get(key)
if ok {
c.cacheLock.Unlock()

if c.metrics != nil {
c.metrics.cacheHits.Increment()
}
return v, nil
}

Expand All @@ -109,6 +148,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {

c.cacheLock.Unlock()

if c.metrics != nil {
c.metrics.cacheMisses.Increment()
}

if alreadyLoading {
// The result is being fetched on another goroutine. Wait for it to finish.
return c.waitForResult(ctx, result)
Expand Down Expand Up @@ -150,11 +193,31 @@ func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *ac
<-c.concurrencyLimiter
}

if c.metrics != nil {
start := time.Now()
defer func() {
c.metrics.cacheMissLatency.ReportLatency(time.Since(start))
}()
}

c.cacheLock.Lock()

// Update the cache if the fetch was successful.
if err == nil {
c.cache.Put(key, value)

if c.metrics != nil {
c.insertionTimes[key] = time.Now()
size := c.cache.Size()
weight := c.cache.Weight()
c.metrics.size.Set(float64(size))
c.metrics.weight.Set(float64(weight))
var averageWeight float64
if size > 0 {
averageWeight = float64(weight) / float64(size)
}
c.metrics.averageWeight.Set(averageWeight)
}
}

// Provide the result to all other goroutines that may be waiting for it.
Expand Down
98 changes: 98 additions & 0 deletions relay/cache/cache_accessor_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package cache

import (
"fmt"
"github.com/Layr-Labs/eigenda/common/metrics"
"time"
)

// CacheAccessorMetrics provides metrics for a CacheAccessor.
type CacheAccessorMetrics struct {
cacheHits metrics.CountMetric
cacheMisses metrics.CountMetric
size metrics.GaugeMetric
weight metrics.GaugeMetric
averageWeight metrics.GaugeMetric
averageLifespan metrics.RunningAverageMetric
cacheMissLatency metrics.LatencyMetric
}

// NewCacheAccessorMetrics creates a new CacheAccessorMetrics.
func NewCacheAccessorMetrics(
server metrics.Metrics,
cacheName string) (*CacheAccessorMetrics, error) {

cacheHits, err := server.NewCountMetric(
fmt.Sprintf("%s_cache_hit", cacheName),
fmt.Sprintf("Number of cache hits in the %s cache", cacheName),
nil)
if err != nil {
return nil, err
}

cacheMisses, err := server.NewCountMetric(
fmt.Sprintf("%s_cache_miss", cacheName),
fmt.Sprintf("Number of cache misses in the %s cache", cacheName),
nil)
if err != nil {
return nil, err
}

size, err := server.NewGaugeMetric(
fmt.Sprintf("%s_cache", cacheName),
"size",
fmt.Sprintf("Number of items in the %s cache", cacheName),
nil)
if err != nil {
return nil, err
}

weight, err := server.NewGaugeMetric(
fmt.Sprintf("%s_cache", cacheName),
"weight",
fmt.Sprintf("Total weight of items in the %s cache", cacheName),
nil)
if err != nil {
return nil, err
}

averageWeight, err := server.NewGaugeMetric(
fmt.Sprintf("%s_cache_average", cacheName),
"weight",
fmt.Sprintf("Average weight of items currently in the %s cache", cacheName),
nil)
if err != nil {
return nil, err
}

averageLifespan, err := server.NewRunningAverageMetric(
fmt.Sprintf("%s_cache_average_lifespan", cacheName),
"ms",
fmt.Sprintf("Average time an item remains in the %s cache before being evicted.", cacheName),
time.Minute,
nil)
if err != nil {
return nil, err
}

cacheMissLatency, err := server.NewLatencyMetric(
fmt.Sprintf("%s_cache_miss_latency", cacheName),
fmt.Sprintf("Latency of cache misses in the %s cache", cacheName),
nil,
&metrics.Quantile{Quantile: 0.5, Error: 0.05},
&metrics.Quantile{Quantile: 0.9, Error: 0.05},
&metrics.Quantile{Quantile: 0.99, Error: 0.05})
if err != nil {
return nil, err
}

return &CacheAccessorMetrics{
cacheHits: cacheHits,
cacheMisses: cacheMisses,
size: size,
weight: weight,
averageWeight: averageWeight,
averageLifespan: averageLifespan,
cacheMissLatency: cacheMissLatency,
}, nil
}
40 changes: 7 additions & 33 deletions relay/cache/cache_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ func TestRandomOperationsSingleThread(t *testing.T) {
return &str, nil
}
cacheSize := rand.Intn(dataSize) + 1
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

for i := 0; i < dataSize; i++ {
Expand Down Expand Up @@ -83,11 +80,7 @@ func TestCacheMisses(t *testing.T) {
return &str, nil
}

c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

// Get the first cacheSize keys. This should fill the cache.
Expand Down Expand Up @@ -150,11 +143,7 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) {
}
cacheSize := rand.Intn(dataSize) + 1

c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

// Lock the accessor. This will cause all cache misses to block.
Expand Down Expand Up @@ -223,11 +212,7 @@ func TestParallelAccessWithError(t *testing.T) {
}
cacheSize := 100

c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

// Lock the accessor. This will cause all cache misses to block.
Expand Down Expand Up @@ -299,11 +284,8 @@ func TestConcurrencyLimiter(t *testing.T) {
}

cacheSize := 100
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, maxConcurrency, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -357,11 +339,7 @@ func TestOriginalRequesterTimesOut(t *testing.T) {
}
cacheSize := rand.Intn(dataSize) + 1

c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

// Lock the accessor. This will cause all cache misses to block.
Expand Down Expand Up @@ -449,11 +427,7 @@ func TestSecondaryRequesterTimesOut(t *testing.T) {
}
cacheSize := rand.Intn(dataSize) + 1

c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil)
require.NoError(t, err)

// Lock the accessor. This will cause all cache misses to block.
Expand Down
Loading

0 comments on commit dfd2925

Please sign in to comment.