Skip to content

Commit

Permalink
Merge pull request cortexproject#5661 from yeya24/async-backfilling
Browse files Browse the repository at this point in the history
Make multilevel cache backfilling async
  • Loading branch information
yeya24 authored Nov 21, 2023
2 parents 050632f + 3c231a1 commit 72ba1d5
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 82 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Azure Storage: Upgraded objstore dependency and support Azure Workload Identity Authentication. Added `connection_string` to support authenticating via SAS token. Marked `msi_resource` config as deprecating. #5645
* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638

Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
[enabled_items: <list of string> | default = []]

multilevel:
# The maximum number of concurrent asynchronous operations can occur
# when backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 50]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
[enabled_items: <list of string> | default = []]

multilevel:
# The maximum number of concurrent asynchronous operations can occur
# when backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 50]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,17 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
[enabled_items: <list of string> | default = []]

multilevel:
# The maximum number of concurrent asynchronous operations can occur when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 50]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1519,8 +1519,8 @@ github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 h1:gx2MTto1UQRu
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98/go.mod h1:JauBAcJ61tRSv9widgISVmA6akQXDeUMXBrVmWW4xog=
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ=
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q=
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c h1:hMpXd1ybZB/vnR3+zex93va42rQ++2E0qi2wVSf3AwY=
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c/go.mod h1:q+0MQPBugkBKZBFSOec4WV4EcuKJU6tgMI0i4M2znpY=
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e h1:ej5fKlojY+r8qty//Q4b7nyNA4QEkJ5uWms77Itf75E=
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e/go.mod h1:qeDC74QOf5hWzTlvIrLT8WlNGg67nORFON0T2VF4qgg=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ type userTSDB struct {
lastUpdate atomic.Int64

// Thanos shipper used to ship blocks to the storage.
shipper Shipper
shipper Shipper
shipperMetadataFilePath string

// When deletion marker is found for the tenant (checked before shipping),
// shipping stops and TSDB is closed before reaching idle timeout time (if enabled).
Expand Down Expand Up @@ -435,7 +436,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {

// updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks.
func (u *userTSDB) updateCachedShippedBlocks() error {
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
shipperMeta, err := shipper.ReadMetaFile(u.shipperMetadataFilePath)
if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) {
// If the meta file doesn't exist it means the shipper hasn't run yet.
shipperMeta = &shipper.Meta{}
Expand Down Expand Up @@ -606,7 +607,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
}
}

// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.
// New returns a new Ingester that uses Cortex block storage instead of chunks storage.
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
defaultInstanceLimits = &cfg.DefaultLimits
if cfg.ingesterClientFactory == nil {
Expand Down Expand Up @@ -2050,7 +2051,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
},
true, // Allow out of order uploads. It's fine in Cortex's context.
metadata.NoneFunc,
"",
)
userDB.shipperMetadataFilePath = filepath.Join(userDB.db.Dir(), filepath.Clean(shipper.DefaultMetaFilename))

// Initialise the shipper blocks cache.
if err := userDB.updateCachedShippedBlocks(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) {
require.Equal(t, len(db.getCachedShippedBlocks()), 0)
shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000")

require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{shippedBlock},
}))
Expand Down Expand Up @@ -2858,7 +2858,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP

// Mock the shipper meta (no blocks).
db := i.getTSDB(userID)
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
}))

Expand Down Expand Up @@ -3788,7 +3788,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Saying that we have shipped the second block, so only that should get deleted.
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
}))
Expand Down Expand Up @@ -3816,7 +3816,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Shipping 2 more blocks, hence all the blocks from first round.
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
}))
Expand Down
40 changes: 35 additions & 5 deletions pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ var (
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
errDuplicatedIndexCacheBackend = errors.New("duplicated index cache backend")
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0")
errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0")
)

type IndexCacheConfig struct {
Backend string `yaml:"backend"`
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
Memcached MemcachedIndexCacheConfig `yaml:"memcached"`
Redis RedisIndexCacheConfig `yaml:"redis"`
Backend string `yaml:"backend"`
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
Memcached MemcachedIndexCacheConfig `yaml:"memcached"`
Redis RedisIndexCacheConfig `yaml:"redis"`
MultiLevel MultiLevelIndexCacheConfig `yaml:"multilevel"`
}

func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -64,6 +67,7 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")
}

// Validate the config.
Expand All @@ -72,6 +76,12 @@ func (cfg *IndexCacheConfig) Validate() error {
splitBackends := strings.Split(cfg.Backend, ",")
configuredBackends := map[string]struct{}{}

if len(splitBackends) > 1 {
if err := cfg.MultiLevel.Validate(); err != nil {
return err
}
}

for _, backend := range splitBackends {
if !util.StringsContain(supportedIndexCacheBackends, backend) {
return errUnsupportedIndexCacheBackend
Expand Down Expand Up @@ -101,6 +111,26 @@ func (cfg *IndexCacheConfig) Validate() error {
return nil
}

type MultiLevelIndexCacheConfig struct {
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
}

func (cfg *MultiLevelIndexCacheConfig) Validate() error {
if cfg.MaxAsyncBufferSize <= 0 {
return errInvalidMaxAsyncBufferSize
}
if cfg.MaxAsyncConcurrency <= 0 {
return errInvalidMaxAsyncConcurrency
}
return nil
}

func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.")
f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.")
}

type InMemoryIndexCacheConfig struct {
MaxSizeBytes uint64 `yaml:"max_size_bytes"`
EnabledItems []string `yaml:"enabled_items"`
Expand Down Expand Up @@ -210,7 +240,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
}
}

return newMultiLevelCache(registerer, caches...), nil
return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
62 changes: 41 additions & 21 deletions pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package tsdb

import (
"context"
"errors"
"sync"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/cacheutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

Expand All @@ -21,8 +23,10 @@ const (
type multiLevelCache struct {
caches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
backfillProcessor *cacheutil.AsyncOperationProcessor
backfillDroppedItems *prometheus.CounterVec
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
Expand All @@ -44,9 +48,11 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

misses = keys
hits = map[labels.Label][]byte{}
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
backfillItems := make([][]map[labels.Label][]byte, len(m.caches)-1)
for i, c := range m.caches {
backfillMap[c] = []map[labels.Label][]byte{}
if i < len(m.caches)-1 {
backfillItems[i] = []map[labels.Label][]byte{}
}
if ctx.Err() != nil {
return
}
Expand All @@ -58,7 +64,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
}

if i > 0 {
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
backfillItems[i-1] = append(backfillItems[i-1], h)
}

if len(misses) == 0 {
Expand All @@ -69,13 +75,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for i, hit := range backfillItems {
for _, values := range hit {
for l, b := range values {
if ctx.Err() != nil {
return
for lbl, b := range values {
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StorePostings(blockID, lbl, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
}
cache.StorePostings(blockID, l, b, tenant)
}
}
}
Expand Down Expand Up @@ -108,7 +115,11 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc()
}
backFillTimer.ObserveDuration()
}
return d, h
Expand Down Expand Up @@ -137,10 +148,12 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}
backfillItems := make([][]map[storage.SeriesRef][]byte, len(m.caches)-1)

for i, c := range m.caches {
backfillMap[c] = []map[storage.SeriesRef][]byte{}
if i < len(m.caches)-1 {
backfillItems[i] = []map[storage.SeriesRef][]byte{}
}
if ctx.Err() != nil {
return
}
Expand All @@ -152,7 +165,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
}

if i > 0 && len(h) > 0 {
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
backfillItems[i-1] = append(backfillItems[i-1], h)
}

if len(misses) == 0 {
Expand All @@ -163,13 +176,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for i, hit := range backfillItems {
for _, values := range hit {
for m, b := range values {
if ctx.Err() != nil {
return
for ref, b := range values {
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StoreSeries(blockID, ref, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
}
cache.StoreSeries(blockID, m, b, tenant)
}
}
}
Expand All @@ -178,12 +192,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
return hits, misses
}

func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache {
func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
}

return &multiLevelCache{
caches: c,
caches: c,
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Expand All @@ -194,5 +210,9 @@ func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) s
Help: "Histogram to track latency to backfill items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
}, []string{"item_type"}),
}
}
Loading

0 comments on commit 72ba1d5

Please sign in to comment.