Skip to content

Commit

Permalink
Add in-memory chunk cache (cortexproject#6245)
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 authored Oct 2, 2024
1 parent 6a7f44c commit 53556fe
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 14 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## master / unreleased

* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
Expand Down
9 changes: 8 additions & 1 deletion docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,17 @@ blocks_storage:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis, inmemory, and '' (disable).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

inmemory:
# Maximum size in bytes of in-memory chunk cache used to speed up chunk
# lookups (shared between all tenants).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]

memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV
Expand Down
9 changes: 8 additions & 1 deletion docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,17 @@ blocks_storage:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis, inmemory, and '' (disable).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

inmemory:
# Maximum size in bytes of in-memory chunk cache used to speed up chunk
# lookups (shared between all tenants).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]

memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV
Expand Down
9 changes: 8 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1339,10 +1339,17 @@ bucket_store:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis, inmemory, and '' (disable).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

inmemory:
# Maximum size in bytes of in-memory chunk cache used to speed up chunk
# lookups (shared between all tenants).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]

memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query,
Expand Down
13 changes: 13 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
chunkCacheBackend: tsdb.CacheBackendRedis,
bucketIndexEnabled: true,
},
"blocks default sharding, in-memory chunk cache": {
blocksShardingStrategy: "default",
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendInMemory,
bucketIndexEnabled: true,
},
"blocks shuffle sharding, in-memory chunk cache": {
blocksShardingStrategy: "shuffle-sharding",
tenantShardSize: 1,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendInMemory,
bucketIndexEnabled: true,
},
}

for testName, testCfg := range tests {
Expand Down
102 changes: 92 additions & 10 deletions pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/golang/snappy"
"github.com/oklog/ulid"
Expand All @@ -18,22 +19,28 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/model"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

var (
errUnsupportedChunkCacheBackend = errors.New("unsupported chunk cache backend")
)

const (
CacheBackendMemcached = "memcached"
CacheBackendRedis = "redis"
CacheBackendInMemory = "inmemory"
)

type CacheBackend struct {
type MetadataCacheBackend struct {
Backend string `yaml:"backend"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
}

// Validate the config.
func (cfg *CacheBackend) Validate() error {
func (cfg *MetadataCacheBackend) Validate() error {
switch cfg.Backend {
case CacheBackendMemcached:
return cfg.Memcached.Validate()
Expand All @@ -46,8 +53,29 @@ func (cfg *CacheBackend) Validate() error {
return nil
}

type ChunkCacheBackend struct {
Backend string `yaml:"backend"`
InMemory InMemoryChunkCacheConfig `yaml:"inmemory"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
}

// Validate the config.
func (cfg *ChunkCacheBackend) Validate() error {
switch cfg.Backend {
case CacheBackendMemcached:
return cfg.Memcached.Validate()
case CacheBackendRedis:
return cfg.Redis.Validate()
case CacheBackendInMemory, "":
default:
return errUnsupportedChunkCacheBackend
}
return nil
}

type ChunksCacheConfig struct {
CacheBackend `yaml:",inline"`
ChunkCacheBackend `yaml:",inline"`

SubrangeSize int64 `yaml:"subrange_size"`
MaxGetRangeRequests int `yaml:"max_get_range_requests"`
Expand All @@ -56,10 +84,11 @@ type ChunksCacheConfig struct {
}

func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached))
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s, %s, and '' (disable).", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory))

cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")

f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
Expand All @@ -68,11 +97,34 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st
}

func (cfg *ChunksCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
return cfg.ChunkCacheBackend.Validate()
}

type InMemoryChunkCacheConfig struct {
MaxSizeBytes uint64 `yaml:"max_size_bytes"`
}

func (cfg *InMemoryChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory chunk cache used to speed up chunk lookups (shared between all tenants).")
}

func (cfg *InMemoryChunkCacheConfig) toInMemoryChunkCacheConfig() cache.InMemoryCacheConfig {
maxCacheSize := model.Bytes(cfg.MaxSizeBytes)

// Calculate the max item size.
maxItemSize := defaultMaxItemSize
if maxItemSize > maxCacheSize {
maxItemSize = maxCacheSize
}

return cache.InMemoryCacheConfig{
MaxSize: maxCacheSize,
MaxItemSize: maxItemSize,
}
}

type MetadataCacheConfig struct {
CacheBackend `yaml:",inline"`
MetadataCacheBackend `yaml:",inline"`

TenantsListTTL time.Duration `yaml:"tenants_list_ttl"`
TenantBlocksListTTL time.Duration `yaml:"tenant_blocks_list_ttl"`
Expand Down Expand Up @@ -107,14 +159,14 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix
}

func (cfg *MetadataCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
return cfg.MetadataCacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

chunksCache, err := createCache("chunks-cache", &chunksConfig.CacheBackend, logger, reg)
chunksCache, err := createChunkCache("chunks-cache", &chunksConfig.ChunkCacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "chunks-cache")
}
Expand All @@ -124,7 +176,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
}

metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg)
metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "metadata-cache")
}
Expand Down Expand Up @@ -152,12 +204,42 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
}

func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
// No caching.
return nil, nil
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create memcached client")
}
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil

case CacheBackendRedis:
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create redis client")
}
return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil

default:
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend)
}
}

func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
// No caching.
return nil, nil
case CacheBackendInMemory:
inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig())
if err != nil {
return nil, errors.Wrapf(err, "failed to create in-memory chunk cache")
}
return inMemoryCache, nil
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
Expand Down
51 changes: 51 additions & 0 deletions pkg/storage/tsdb/caching_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,57 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_ChunkCacheBackendValidation(t *testing.T) {
tests := map[string]struct {
cfg ChunkCacheBackend
expectedErr error
}{
"valid chunk cache type ('')": {
cfg: ChunkCacheBackend{
Backend: "",
},
expectedErr: nil,
},
"valid chunk cache type (in-memory)": {
cfg: ChunkCacheBackend{
Backend: CacheBackendInMemory,
},
expectedErr: nil,
},
"valid chunk cache type (memcached)": {
cfg: ChunkCacheBackend{
Backend: CacheBackendMemcached,
Memcached: MemcachedClientConfig{
Addresses: "dns+localhost:11211",
},
},
expectedErr: nil,
},
"valid chunk cache type (redis)": {
cfg: ChunkCacheBackend{
Backend: CacheBackendRedis,
Redis: RedisClientConfig{
Addresses: "localhost:6379",
},
},
expectedErr: nil,
},
"invalid chunk cache type": {
cfg: ChunkCacheBackend{
Backend: "dummy",
},
expectedErr: errUnsupportedChunkCacheBackend,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
err := tc.cfg.Validate()
assert.Equal(t, tc.expectedErr, err)
})
}
}

func TestIsTenantDir(t *testing.T) {
assert.False(t, isTenantBlocksDir(""))
assert.True(t, isTenantBlocksDir("test"))
Expand Down

0 comments on commit 53556fe

Please sign in to comment.