diff --git a/go.mod b/go.mod index bab065561..dc262e397 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,6 @@ require ( github.com/docker/docker v25.0.5+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/emirpasic/gods v1.18.1 // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect diff --git a/relay/blob_provider.go b/relay/blob_provider.go index 70cc31066..e8e84e1d9 100644 --- a/relay/blob_provider.go +++ b/relay/blob_provider.go @@ -33,7 +33,8 @@ func newBlobProvider( blobStore *blobstore.BlobStore, blobCacheSize uint64, maxIOConcurrency int, - fetchTimeout time.Duration) (*blobProvider, error) { + fetchTimeout time.Duration, + metrics *cache.CacheAccessorMetrics) (*blobProvider, error) { server := &blobProvider{ ctx: ctx, @@ -42,9 +43,13 @@ func newBlobProvider( fetchTimeout: fetchTimeout, } - c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight) + cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte]( + computeBlobCacheWeight, + blobCacheSize, + maxIOConcurrency, + server.fetchBlob, + metrics) - cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob) if err != nil { return nil, fmt.Errorf("error creating blob cache: %w", err) } diff --git a/relay/blob_provider_test.go b/relay/blob_provider_test.go index 22368a5d5..8ac2e6657 100644 --- a/relay/blob_provider_test.go +++ b/relay/blob_provider_test.go @@ -41,7 +41,8 @@ func TestReadWrite(t *testing.T) { blobStore, 1024*1024*32, 32, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) // Read the blobs back. @@ -78,7 +79,8 @@ func TestNonExistentBlob(t *testing.T) { blobStore, 1024*1024*32, 32, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) for i := 0; i < 10; i++ { diff --git a/relay/cache/cache_accessor.go b/relay/cache/cache_accessor.go index a6389538b..bfebf1c6b 100644 --- a/relay/cache/cache_accessor.go +++ b/relay/cache/cache_accessor.go @@ -4,6 +4,7 @@ import ( "context" "golang.org/x/sync/semaphore" "sync" + "time" ) // CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses @@ -47,6 +48,9 @@ type cacheAccessor[K comparable, V any] struct { // cache is the underlying cache that this wrapper manages. cache Cache[K, V] + // weightCalculator is the function used to calculate the weight of a key-value pair. + calculator WeightCalculator[K, V] + // concurrencyLimiter is a channel used to limit the number of concurrent lookups that can be in progress. concurrencyLimiter chan struct{} @@ -55,17 +59,29 @@ type cacheAccessor[K comparable, V any] struct { // accessor is the function used to fetch values that are not in the cache. accessor Accessor[K, V] + + // insertionTimes is a map of keys to the time they were inserted into the cache. Used to calculate the average + // lifespan of items in the cache. + insertionTimes map[K]time.Time + + // metrics is used to record metrics about the cache accessor's performance. + metrics *CacheAccessorMetrics } -// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items -// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent -// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess -// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor -// parameter is the function used to fetch values that are not in the cache. +// NewCacheAccessor creates a new CacheAccessor. +// +// The concurrencyLimit parameter specifies the maximum number of concurrent lookups that can be in progress at any +// given time. If a greater number of lookups are requested, the excess lookups will block until a lookup completes. +// If concurrencyLimit is zero, then no limits are imposed. The accessor parameter is the function used to fetch values that are not in the cache. +// +// If metrics is not nil, it will be used to record metrics about the cache accessor's performance. +// If nil, no metrics will be recorded. func NewCacheAccessor[K comparable, V any]( - cache Cache[K, V], + calculator WeightCalculator[K, V], + maxWeight uint64, concurrencyLimit int, - accessor Accessor[K, V]) (CacheAccessor[K, V], error) { + accessor Accessor[K, V], + metrics *CacheAccessorMetrics) (CacheAccessor[K, V], error) { lookupsInProgress := make(map[K]*accessResult[V]) @@ -74,11 +90,30 @@ func NewCacheAccessor[K comparable, V any]( concurrencyLimiter = make(chan struct{}, concurrencyLimit) } + insertionTimes := make(map[K]time.Time) + var evictionHandler func(K, V) + if metrics != nil { + // If metrics are enabled, track the amount of time each item spends in the cache. + // Thread safety is provided by the cacheLock. + evictionHandler = func(key K, _ V) { + if insertionTime, ok := insertionTimes[key]; ok { + lifespan := time.Since(insertionTime).Milliseconds() + metrics.averageLifespan.Update(float64(lifespan)) + delete(insertionTimes, key) + } + } + } + + cache := NewFIFOCache(maxWeight, calculator, evictionHandler) + return &cacheAccessor[K, V]{ cache: cache, + calculator: calculator, concurrencyLimiter: concurrencyLimiter, accessor: accessor, lookupsInProgress: lookupsInProgress, + insertionTimes: insertionTimes, + metrics: metrics, }, nil } @@ -97,6 +132,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { v, ok := c.cache.Get(key) if ok { c.cacheLock.Unlock() + + if c.metrics != nil { + c.metrics.cacheHits.Increment() + } return v, nil } @@ -109,6 +148,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { c.cacheLock.Unlock() + if c.metrics != nil { + c.metrics.cacheMisses.Increment() + } + if alreadyLoading { // The result is being fetched on another goroutine. Wait for it to finish. return c.waitForResult(ctx, result) @@ -150,11 +193,31 @@ func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *ac <-c.concurrencyLimiter } + if c.metrics != nil { + start := time.Now() + defer func() { + c.metrics.cacheMissLatency.ReportLatency(time.Since(start)) + }() + } + c.cacheLock.Lock() // Update the cache if the fetch was successful. if err == nil { c.cache.Put(key, value) + + if c.metrics != nil { + c.insertionTimes[key] = time.Now() + size := c.cache.Size() + weight := c.cache.Weight() + c.metrics.size.Set(float64(size)) + c.metrics.weight.Set(float64(weight)) + var averageWeight float64 + if size > 0 { + averageWeight = float64(weight) / float64(size) + } + c.metrics.averageWeight.Set(averageWeight) + } } // Provide the result to all other goroutines that may be waiting for it. diff --git a/relay/cache/cache_accessor_metrics.go b/relay/cache/cache_accessor_metrics.go new file mode 100644 index 000000000..e4a4d50fc --- /dev/null +++ b/relay/cache/cache_accessor_metrics.go @@ -0,0 +1,98 @@ +package cache + +import ( + "fmt" + "github.com/Layr-Labs/eigenda/common/metrics" + "time" +) + +// CacheAccessorMetrics provides metrics for a CacheAccessor. +type CacheAccessorMetrics struct { + cacheHits metrics.CountMetric + cacheMisses metrics.CountMetric + size metrics.GaugeMetric + weight metrics.GaugeMetric + averageWeight metrics.GaugeMetric + averageLifespan metrics.RunningAverageMetric + cacheMissLatency metrics.LatencyMetric +} + +// NewCacheAccessorMetrics creates a new CacheAccessorMetrics. +func NewCacheAccessorMetrics( + server metrics.Metrics, + cacheName string) (*CacheAccessorMetrics, error) { + + cacheHits, err := server.NewCountMetric( + fmt.Sprintf("%s_cache_hit", cacheName), + fmt.Sprintf("Number of cache hits in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + cacheMisses, err := server.NewCountMetric( + fmt.Sprintf("%s_cache_miss", cacheName), + fmt.Sprintf("Number of cache misses in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + size, err := server.NewGaugeMetric( + fmt.Sprintf("%s_cache", cacheName), + "size", + fmt.Sprintf("Number of items in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + weight, err := server.NewGaugeMetric( + fmt.Sprintf("%s_cache", cacheName), + "weight", + fmt.Sprintf("Total weight of items in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + averageWeight, err := server.NewGaugeMetric( + fmt.Sprintf("%s_cache_average", cacheName), + "weight", + fmt.Sprintf("Average weight of items currently in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + averageLifespan, err := server.NewRunningAverageMetric( + fmt.Sprintf("%s_cache_average_lifespan", cacheName), + "ms", + fmt.Sprintf("Average time an item remains in the %s cache before being evicted.", cacheName), + time.Minute, + nil) + if err != nil { + return nil, err + } + + cacheMissLatency, err := server.NewLatencyMetric( + fmt.Sprintf("%s_cache_miss_latency", cacheName), + fmt.Sprintf("Latency of cache misses in the %s cache", cacheName), + nil, + &metrics.Quantile{Quantile: 0.5, Error: 0.05}, + &metrics.Quantile{Quantile: 0.9, Error: 0.05}, + &metrics.Quantile{Quantile: 0.99, Error: 0.05}) + if err != nil { + return nil, err + } + + return &CacheAccessorMetrics{ + cacheHits: cacheHits, + cacheMisses: cacheMisses, + size: size, + weight: weight, + averageWeight: averageWeight, + averageLifespan: averageLifespan, + cacheMissLatency: cacheMissLatency, + }, nil +} diff --git a/relay/cache/cache_accessor_test.go b/relay/cache/cache_accessor_test.go index 0f2ac501d..16f33d0b0 100644 --- a/relay/cache/cache_accessor_test.go +++ b/relay/cache/cache_accessor_test.go @@ -32,11 +32,8 @@ func TestRandomOperationsSingleThread(t *testing.T) { return &str, nil } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) for i := 0; i < dataSize; i++ { @@ -83,11 +80,7 @@ func TestCacheMisses(t *testing.T) { return &str, nil } - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) // Get the first cacheSize keys. This should fill the cache. @@ -150,11 +143,7 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) { } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -223,11 +212,7 @@ func TestParallelAccessWithError(t *testing.T) { } cacheSize := 100 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -299,11 +284,8 @@ func TestConcurrencyLimiter(t *testing.T) { } cacheSize := 100 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - ca, err := NewCacheAccessor[int, *string](c, maxConcurrency, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) wg := sync.WaitGroup{} @@ -357,11 +339,7 @@ func TestOriginalRequesterTimesOut(t *testing.T) { } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -449,11 +427,7 @@ func TestSecondaryRequesterTimesOut(t *testing.T) { } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + ca, err := NewCacheAccessor[int, *string](nil, uint64(cacheSize), 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. diff --git a/relay/cache/fifo-cache.go b/relay/cache/fifo-cache.go index 1c2e7c6ab..d6e44d3d4 100644 --- a/relay/cache/fifo-cache.go +++ b/relay/cache/fifo-cache.go @@ -16,15 +16,30 @@ type FIFOCache[K comparable, V any] struct { maxWeight uint64 data map[K]V expirationQueue queues.Queue + evictionHandler func(K, V) } -// NewFIFOCache creates a new FIFOCache. -func NewFIFOCache[K comparable, V any](maxWeight uint64, calculator WeightCalculator[K, V]) *FIFOCache[K, V] { +// NewFIFOCache creates a new FIFOCache. If the calculator is nil, the weight of each key-value pair will be 1. +// If the evictionHandler is nil it is ignored. +func NewFIFOCache[K comparable, V any]( + maxWeight uint64, + calculator WeightCalculator[K, V], + evictionHandler func(K, V)) *FIFOCache[K, V] { + + if calculator == nil { + calculator = func(K, V) uint64 { return 1 } + } + + if evictionHandler == nil { + evictionHandler = func(K, V) {} + } + return &FIFOCache[K, V]{ maxWeight: maxWeight, data: make(map[K]V), weightCalculator: calculator, expirationQueue: linkedlistqueue.New(), + evictionHandler: evictionHandler, } } @@ -61,6 +76,7 @@ func (f *FIFOCache[K, V]) Put(key K, value V) { weightToEvict := f.weightCalculator(keyToEvict, f.data[keyToEvict]) delete(f.data, keyToEvict) f.currentWeight -= weightToEvict + f.evictionHandler(keyToEvict, f.data[keyToEvict]) } } diff --git a/relay/cache/fifo_cache_test.go b/relay/cache/fifo_cache_test.go index da4de5ad1..3709c1911 100644 --- a/relay/cache/fifo_cache_test.go +++ b/relay/cache/fifo_cache_test.go @@ -11,9 +11,7 @@ func TestExpirationOrder(t *testing.T) { tu.InitializeRandom() maxWeight := uint64(10 + rand.Intn(10)) - c := NewFIFOCache[int, int](maxWeight, func(key int, value int) uint64 { - return 1 - }) + c := NewFIFOCache[int, int](maxWeight, nil, nil) require.Equal(t, uint64(0), c.Weight()) require.Equal(t, 0, c.Size()) @@ -85,7 +83,7 @@ func TestWeightedValues(t *testing.T) { return uint64(key) } - c := NewFIFOCache[int, int](maxWeight, weightCalculator) + c := NewFIFOCache[int, int](maxWeight, weightCalculator, nil) expectedValues := make(map[int]int) diff --git a/relay/chunk_provider.go b/relay/chunk_provider.go index 5bc292673..256607459 100644 --- a/relay/chunk_provider.go +++ b/relay/chunk_provider.go @@ -50,7 +50,8 @@ func newChunkProvider( cacheSize uint64, maxIOConcurrency int, proofFetchTimeout time.Duration, - coefficientFetchTimeout time.Duration) (*chunkProvider, error) { + coefficientFetchTimeout time.Duration, + metrics *cache.CacheAccessorMetrics) (*chunkProvider, error) { server := &chunkProvider{ ctx: ctx, @@ -60,12 +61,12 @@ func newChunkProvider( coefficientFetchTimeout: coefficientFetchTimeout, } - c := cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, computeFramesCacheWeight) - cacheAccessor, err := cache.NewCacheAccessor[blobKeyWithMetadata, []*encoding.Frame]( - c, + computeFramesCacheWeight, + cacheSize, maxIOConcurrency, - server.fetchFrames) + server.fetchFrames, + metrics) if err != nil { return nil, err } diff --git a/relay/chunk_provider_test.go b/relay/chunk_provider_test.go index 06ec215b8..99a85345d 100644 --- a/relay/chunk_provider_test.go +++ b/relay/chunk_provider_test.go @@ -52,7 +52,8 @@ func TestFetchingIndividualBlobs(t *testing.T) { 1024*1024*32, 32, 10*time.Second, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) // Read it back. @@ -139,7 +140,8 @@ func TestFetchingBatchedBlobs(t *testing.T) { 1024*1024*32, 32, 10*time.Second, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) // Read it back. diff --git a/relay/mdoc/relay-metrics.md b/relay/mdoc/relay-metrics.md index 02f24a340..46ea716b1 100644 --- a/relay/mdoc/relay-metrics.md +++ b/relay/mdoc/relay-metrics.md @@ -1,8 +1,8 @@ # Metrics Documentation for namespace 'relay' -This documentation was automatically generated at time `2024-11-27T10:22:37-06:00` +This documentation was automatically generated at time `2024-11-27T12:13:08-06:00` -There are a total of `13` registered metrics. +There are a total of `34` registered metrics. --- @@ -45,6 +45,178 @@ Average number of keys in a GetChunks request | **Fully Qualified Name** | `relay_average_get_chunks_key_count` | --- +## blob_cache_size + +Number of items in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache` | +| **Unit** | `size` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_size` | +--- + +## blob_cache_weight + +Total weight of items in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_weight` | +--- + +## blob_cache_average_weight + +Average weight of items currently in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_average` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_average_weight` | +--- + +## blob_cache_average_lifespan_ms + +Average time an item remains in the blob cache before being evicted. + +| | | +|---|---| +| **Name** | `blob_cache_average_lifespan` | +| **Unit** | `ms` | +| **Type** | `running average` | +| **Time Window** | `1m0s` | +| **Fully Qualified Name** | `relay_blob_cache_average_lifespan_ms` | +--- + +## blob_cache_hit_count + +Number of cache hits in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_hit` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_blob_cache_hit_count` | +--- + +## blob_cache_miss_count + +Number of cache misses in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_miss` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_blob_cache_miss_count` | +--- + +## blob_cache_miss_latency_ms + +Latency of cache misses in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_miss_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_blob_cache_miss_latency_ms` | +--- + +## chunk_cache_size + +Number of items in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache` | +| **Unit** | `size` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_size` | +--- + +## chunk_cache_weight + +Total weight of items in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_weight` | +--- + +## chunk_cache_average_weight + +Average weight of items currently in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_average` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_average_weight` | +--- + +## chunk_cache_average_lifespan_ms + +Average time an item remains in the chunk cache before being evicted. + +| | | +|---|---| +| **Name** | `chunk_cache_average_lifespan` | +| **Unit** | `ms` | +| **Type** | `running average` | +| **Time Window** | `1m0s` | +| **Fully Qualified Name** | `relay_chunk_cache_average_lifespan_ms` | +--- + +## chunk_cache_hit_count + +Number of cache hits in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_hit` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_chunk_cache_hit_count` | +--- + +## chunk_cache_miss_count + +Number of cache misses in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_miss` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_chunk_cache_miss_count` | +--- + +## chunk_cache_miss_latency_ms + +Latency of cache misses in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_miss_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_chunk_cache_miss_latency_ms` | +--- + ## get_blob_data_latency_ms Latency of the GetBlob RPC data retrieval @@ -172,3 +344,89 @@ Number of GetChunks RPC rate limited | **Labels** | `reason` | | **Type** | `counter` | | **Fully Qualified Name** | `relay_get_chunks_rate_limited_count` | +--- + +## metadata_cache_size + +Number of items in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache` | +| **Unit** | `size` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_size` | +--- + +## metadata_cache_weight + +Total weight of items in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_weight` | +--- + +## metadata_cache_average_weight + +Average weight of items currently in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_average` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_average_weight` | +--- + +## metadata_cache_average_lifespan_ms + +Average time an item remains in the metadata cache before being evicted. + +| | | +|---|---| +| **Name** | `metadata_cache_average_lifespan` | +| **Unit** | `ms` | +| **Type** | `running average` | +| **Time Window** | `1m0s` | +| **Fully Qualified Name** | `relay_metadata_cache_average_lifespan_ms` | +--- + +## metadata_cache_hit_count + +Number of cache hits in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_hit` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_metadata_cache_hit_count` | +--- + +## metadata_cache_miss_count + +Number of cache misses in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_miss` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_metadata_cache_miss_count` | +--- + +## metadata_cache_miss_latency_ms + +Latency of cache misses in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_miss_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_metadata_cache_miss_latency_ms` | diff --git a/relay/metadata_provider.go b/relay/metadata_provider.go index e1f188bb9..9bdb70964 100644 --- a/relay/metadata_provider.go +++ b/relay/metadata_provider.go @@ -58,7 +58,8 @@ func newMetadataProvider( maxIOConcurrency int, relayIDs []v2.RelayKey, fetchTimeout time.Duration, - blobParamsMap *v2.BlobVersionParameterMap) (*metadataProvider, error) { + blobParamsMap *v2.BlobVersionParameterMap, + metrics *cache.CacheAccessorMetrics) (*metadataProvider, error) { relayIDSet := make(map[v2.RelayKey]struct{}, len(relayIDs)) for _, id := range relayIDs { @@ -74,15 +75,12 @@ func newMetadataProvider( } server.blobParamsMap.Store(blobParamsMap) - c := cache.NewFIFOCache[v2.BlobKey, *blobMetadata](uint64(metadataCacheSize), - func(key v2.BlobKey, value *blobMetadata) uint64 { - return uint64(1) - }) - metadataCache, err := cache.NewCacheAccessor[v2.BlobKey, *blobMetadata]( - c, + nil, + uint64(metadataCacheSize), maxIOConcurrency, - server.fetchMetadata) + server.fetchMetadata, + metrics) if err != nil { return nil, fmt.Errorf("error creating metadata cache: %w", err) } diff --git a/relay/metadata_provider_test.go b/relay/metadata_provider_test.go index b48e157ec..093cfd9f6 100644 --- a/relay/metadata_provider_test.go +++ b/relay/metadata_provider_test.go @@ -32,7 +32,8 @@ func TestGetNonExistentBlob(t *testing.T) { 32, nil, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Try to fetch a non-existent blobs @@ -98,7 +99,8 @@ func TestFetchingIndividualMetadata(t *testing.T) { 32, nil, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) @@ -183,7 +185,8 @@ func TestBatchedFetch(t *testing.T) { 32, nil, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Each iteration, choose a random subset of the keys to fetch @@ -289,7 +292,8 @@ func TestIndividualFetchWithSharding(t *testing.T) { 32, shardList, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Fetch the metadata from the server. @@ -421,7 +425,8 @@ func TestBatchedFetchWithSharding(t *testing.T) { 32, shardList, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Each iteration, choose two random keys to fetch. There will be a 25% chance that both blobs map to valid shards. diff --git a/relay/relay_metrics.go b/relay/metrics.go similarity index 89% rename from relay/relay_metrics.go rename to relay/metrics.go index 5aac47bfc..ebf50e3f1 100644 --- a/relay/relay_metrics.go +++ b/relay/metrics.go @@ -2,6 +2,7 @@ package relay import ( "github.com/Layr-Labs/eigenda/common/metrics" + "github.com/Layr-Labs/eigenda/relay/cache" "github.com/Layr-Labs/eigenda/relay/limiter" "github.com/Layr-Labs/eigensdk-go/logging" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" @@ -13,7 +14,10 @@ type RelayMetrics struct { metricsServer metrics.Metrics grpcServerOption grpc.ServerOption - // TODO (cody-littley): after cache changes merge, add metrics for cache + // Cache metrics + MetadataCacheMetrics *cache.CacheAccessorMetrics + ChunkCacheMetrics *cache.CacheAccessorMetrics + BlobCacheMetrics *cache.CacheAccessorMetrics // GetChunks metrics GetChunksLatency metrics.LatencyMetric @@ -50,6 +54,21 @@ func NewRelayMetrics(logger logging.Logger, port int) (*RelayMetrics, error) { metrics.NewQuantile(0.99), } + metadataCacheMetrics, err := cache.NewCacheAccessorMetrics(server, "metadata") + if err != nil { + return nil, err + } + + chunkCacheMetrics, err := cache.NewCacheAccessorMetrics(server, "chunk") + if err != nil { + return nil, err + } + + blobCacheMetrics, err := cache.NewCacheAccessorMetrics(server, "blob") + if err != nil { + return nil, err + } + getChunksLatencyMetric, err := server.NewLatencyMetric( "get_chunks_latency", "Latency of the GetChunks RPC", @@ -169,6 +188,9 @@ func NewRelayMetrics(logger logging.Logger, port int) (*RelayMetrics, error) { return &RelayMetrics{ metricsServer: server, + MetadataCacheMetrics: metadataCacheMetrics, + ChunkCacheMetrics: chunkCacheMetrics, + BlobCacheMetrics: blobCacheMetrics, grpcServerOption: grpcServerOption, GetChunksLatency: getChunksLatencyMetric, GetChunksAuthenticationLatency: getChunksAuthenticationLatencyMetric, diff --git a/relay/server.go b/relay/server.go index 78703ca2f..61d920dfe 100644 --- a/relay/server.go +++ b/relay/server.go @@ -141,6 +141,11 @@ func NewServer( return nil, fmt.Errorf("error fetching blob params: %w", err) } + metrics, err := NewRelayMetrics(logger, config.MetricsPort) + if err != nil { + return nil, fmt.Errorf("error creating metrics: %w", err) + } + mp, err := newMetadataProvider( ctx, logger, @@ -149,7 +154,8 @@ func NewServer( config.MetadataMaxConcurrency, config.RelayIDs, config.Timeouts.InternalGetMetadataTimeout, - v2.NewBlobVersionParameterMap(blobParams)) + v2.NewBlobVersionParameterMap(blobParams), + metrics.MetadataCacheMetrics) if err != nil { return nil, fmt.Errorf("error creating metadata provider: %w", err) @@ -161,7 +167,8 @@ func NewServer( blobStore, config.BlobCacheBytes, config.BlobMaxConcurrency, - config.Timeouts.InternalGetBlobTimeout) + config.Timeouts.InternalGetBlobTimeout, + metrics.BlobCacheMetrics) if err != nil { return nil, fmt.Errorf("error creating blob provider: %w", err) } @@ -173,7 +180,8 @@ func NewServer( config.ChunkCacheSize, config.ChunkMaxConcurrency, config.Timeouts.InternalGetProofsTimeout, - config.Timeouts.InternalGetCoefficientsTimeout) + config.Timeouts.InternalGetCoefficientsTimeout, + metrics.ChunkCacheMetrics) if err != nil { return nil, fmt.Errorf("error creating chunk provider: %w", err) } @@ -190,11 +198,6 @@ func NewServer( } } - metrics, err := NewRelayMetrics(logger, config.MetricsPort) - if err != nil { - return nil, fmt.Errorf("error creating metrics: %w", err) - } - return &Server{ config: config, logger: logger,