diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index fffcf6bbde..299634546a 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -141,7 +141,10 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") - multiTSDBOptions := []receive.MultiTSDBOption{} + multiTSDBOptions := []receive.MultiTSDBOption{ + receive.WithHeadExpandedPostingsCacheSize(conf.headExpandedPostingsCacheSize), + receive.WithBlockExpandedPostingsCacheSize(conf.compactedBlocksExpandedPostingsCacheSize), + } for _, feature := range *conf.featureList { if feature == metricNamesFilter { multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) @@ -886,6 +889,9 @@ type receiveConfig struct { asyncForwardWorkerCount uint featureList *[]string + + headExpandedPostingsCacheSize uint64 + compactedBlocksExpandedPostingsCacheSize uint64 } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -996,6 +1002,9 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").BoolVar(&rc.noLockFile) + cmd.Flag("tsdb.head.expanded-postings-cache-size", "[EXPERIMENTAL] If non-zero, enables expanded postings cache for the head block.").Default("0").Uint64Var(&rc.headExpandedPostingsCacheSize) + cmd.Flag("tsdb.block.expanded-postings-cache-size", "[EXPERIMENTAL] If non-zero, enables expanded postings cache for compacted blocks.").Default("0").Uint64Var(&rc.compactedBlocksExpandedPostingsCacheSize) + cmd.Flag("tsdb.max-exemplars", "Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant."+ " In case the exemplar storage becomes full (number of stored exemplars becomes equal to max-exemplars),"+ diff --git a/docs/components/receive.md b/docs/components/receive.md index cd0dc322c7..dfaddd73b1 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -552,6 +552,12 @@ Flags: Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge. Does not do anything, enabled all the time. + --tsdb.block.expanded-postings-cache-size=0 + [EXPERIMENTAL] If non-zero, enables expanded + postings cache for compacted blocks. + --tsdb.head.expanded-postings-cache-size=0 + [EXPERIMENTAL] If non-zero, enables expanded + postings cache for the head block. --tsdb.max-exemplars=0 Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant. In case the exemplar diff --git a/pkg/receive/expandedpostingscache/cache.go b/pkg/receive/expandedpostingscache/cache.go new file mode 100644 index 0000000000..d5c7069735 --- /dev/null +++ b/pkg/receive/expandedpostingscache/cache.go @@ -0,0 +1,414 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original code by Alan Protasio (https://github.com/alanprot) in the Cortex project. + +package expandedpostingscache + +import ( + "container/list" + "context" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/cespare/xxhash" + "github.com/oklog/ulid" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" +) + +type ExpandedPostingsCache interface { + PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + ExpireSeries(metric labels.Labels) + tsdb.SeriesLifecycleCallback +} + +type BlocksPostingsForMatchersCache struct { + strippedLock []sync.RWMutex + + headCache *fifoCache[[]storage.SeriesRef] + blocksCache *fifoCache[[]storage.SeriesRef] + + headSeedByMetricName []int + postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + timeNow func() time.Time + + metrics *ExpandedPostingsCacheMetrics +} + +var ( + rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD") + headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") +) + +const ( + // size of the seed array. Each seed is a 64bits int (8 bytes) + // totaling 8mb. + seedArraySize = 1024 * 1024 + + numOfSeedsStripes = 512 +) + +type ExpandedPostingsCacheMetrics struct { + CacheRequests *prometheus.CounterVec + CacheHits *prometheus.CounterVec + CacheEvicts *prometheus.CounterVec + NonCacheableQueries *prometheus.CounterVec +} + +func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics { + return &ExpandedPostingsCacheMetrics{ + CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_cache_requests_total", + Help: "Total number of requests to the cache.", + }, []string{"cache"}), + CacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_cache_hits_total", + Help: "Total number of hit requests to the cache.", + }, []string{"cache"}), + CacheEvicts: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_cache_evicts_total", + Help: "Total number of evictions in the cache, excluding items that got evicted.", + }, []string{"cache", "reason"}), + NonCacheableQueries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_non_cacheable_queries_total", + Help: "Total number of non cacheable queries.", + }, []string{"cache"}), + } +} + +func NewBlocksPostingsForMatchersCache(metrics *ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64) ExpandedPostingsCache { + return &BlocksPostingsForMatchersCache{ + headCache: newFifoCache[[]storage.SeriesRef]("head", metrics, time.Now, headExpandedPostingsCacheSize), + blocksCache: newFifoCache[[]storage.SeriesRef]("block", metrics, time.Now, blockExpandedPostingsCacheSize), + headSeedByMetricName: make([]int, seedArraySize), + strippedLock: make([]sync.RWMutex, numOfSeedsStripes), + postingsForMatchersFunc: tsdb.PostingsForMatchers, + timeNow: time.Now, + metrics: metrics, + } +} + +func (c *BlocksPostingsForMatchersCache) PostCreation(metric labels.Labels) { + c.ExpireSeries(metric) +} + +func (c *BlocksPostingsForMatchersCache) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) { + for _, metric := range metrics { + c.ExpireSeries(metric) + } +} + +func (c *BlocksPostingsForMatchersCache) PreCreation(labels.Labels) error { + return nil +} + +func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { + var metricName string + + metric.Range(func(l labels.Label) { + if l.Name != model.MetricNameLabel { + return + } + metricName = l.Value + }) + + if metricName == "" { + return + } + + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := h % uint64(len(c.strippedLock)) + c.strippedLock[l].Lock() + defer c.strippedLock[l].Unlock() + c.headSeedByMetricName[i]++ +} + +func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + return c.fetchPostings(blockID, ix, ms...)(ctx) +} + +func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { + var seed string + cache := c.blocksCache + + // If is a head block, lets add the seed on the cache key so we can + // invalidate the cache when new series are created for this metric name + if isHeadBlock(blockID) { + cache = c.headCache + + metricName, ok := metricNameFromMatcher(ms) + // Lets not cache head if we don;t find an equal matcher for the label __name__ + if !ok { + c.metrics.NonCacheableQueries.WithLabelValues(cache.name).Inc() + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + } + + seed = c.getSeedForMetricName(metricName) + } + + c.metrics.CacheRequests.WithLabelValues(cache.name).Inc() + + fetch := func() ([]storage.SeriesRef, int64, error) { + // Use context.Background() as this promise is maybe shared across calls + postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) + + if err == nil { + ids, err := index.ExpandPostings(postings) + return ids, int64(len(ids) * 8), err + } + + return nil, 0, err + } + + key := c.cacheKey(seed, blockID, ms...) + promise, loaded := cache.getPromiseForKey(key, fetch) + if loaded { + c.metrics.CacheHits.WithLabelValues(cache.name).Inc() + } + + return c.result(promise) +} + +func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { + return func(ctx context.Context) (index.Postings, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ce.done: + if ctx.Err() != nil { + return nil, ctx.Err() + } + return index.NewListPostings(ce.v), ce.err + } + } +} + +func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := h % uint64(len(c.strippedLock)) + c.strippedLock[l].RLock() + defer c.strippedLock[l].RUnlock() + return strconv.Itoa(c.headSeedByMetricName[i]) +} + +func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { + slices.SortFunc(ms, func(i, j *labels.Matcher) int { + if i.Type != j.Type { + return int(i.Type - j.Type) + } + if i.Name != j.Name { + return strings.Compare(i.Name, j.Name) + } + if i.Value != j.Value { + return strings.Compare(i.Value, j.Value) + } + return 0 + }) + + const ( + typeLen = 2 + sepLen = 1 + ) + + var size int + for _, m := range ms { + size += len(seed) + len(blockID.String()) + len(m.Name) + len(m.Value) + typeLen + 2*sepLen + } + sb := strings.Builder{} + sb.Grow(size) + sb.WriteString(seed) + sb.WriteByte('|') + sb.WriteString(blockID.String()) + for _, m := range ms { + sb.WriteString(m.Name) + sb.WriteString(m.Type.String()) + sb.WriteString(m.Value) + sb.WriteByte('|') + } + key := sb.String() + return key +} + +func isHeadBlock(blockID ulid.ULID) bool { + return blockID == rangeHeadULID || blockID == headULID +} + +func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { + for _, m := range ms { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + return m.Value, true + } + } + + return "", false +} + +// TODO(GiedriusS): convert Thanos caching system to be promised-based +// i.e. avoid multiple loads for same item. This is a copy from Cortex. +// Use as an inspiration. +type fifoCache[V any] struct { + cachedValues *sync.Map + timeNow func() time.Time + name string + metrics ExpandedPostingsCacheMetrics + + ttl time.Duration + maxBytes int64 + + // Fields from here should be locked + cachedMtx sync.RWMutex + cached *list.List + cachedBytes int64 +} + +func newFifoCache[V any](name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] { + return &fifoCache[V]{ + cachedValues: new(sync.Map), + cached: list.New(), + timeNow: timeNow, + name: name, + metrics: *metrics, + ttl: 10 * time.Minute, + maxBytes: int64(maxBytes), + } +} + +func (c *fifoCache[V]) expire() { + if c.ttl.Seconds() <= 0 { + return + } + c.cachedMtx.RLock() + if _, r := c.shouldEvictHead(); !r { + c.cachedMtx.RUnlock() + return + } + c.cachedMtx.RUnlock() + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + for reason, r := c.shouldEvictHead(); r; reason, r = c.shouldEvictHead() { + c.metrics.CacheEvicts.WithLabelValues(c.name, reason).Inc() + c.evictHead() + } +} + +func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { + r := &cacheEntryPromise[V]{ + done: make(chan struct{}), + } + defer close(r.done) + + loaded, ok := c.cachedValues.LoadOrStore(k, r) + + if !ok { + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + r.ts = c.timeNow() + c.created(k, r.sizeBytes) + c.expire() + } + + if ok { + // If the promise is already in the cache, lets wait it to fetch the data. + <-loaded.(*cacheEntryPromise[V]).done + + // If is cached but is expired, lets try to replace the cache value. + if loaded.(*cacheEntryPromise[V]).isExpired(c.ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { + c.metrics.CacheEvicts.WithLabelValues(c.name, "expired").Inc() + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) + loaded = r + r.ts = c.timeNow() + ok = false + } + } + + return loaded.(*cacheEntryPromise[V]), ok +} + +func (c *fifoCache[V]) shouldEvictHead() (string, bool) { + h := c.cached.Front() + if h == nil { + return "", false + } + + if c.cachedBytes > c.maxBytes { + return "full", true + } + key := h.Value.(string) + + if l, ok := c.cachedValues.Load(key); ok { + return "expired", l.(*cacheEntryPromise[V]).isExpired(c.ttl, c.timeNow()) + } + + return "", false +} + +func (c *fifoCache[V]) evictHead() { + front := c.cached.Front() + c.cached.Remove(front) + oldestKey := front.Value.(string) + if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { + c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes + } +} + +func (c *fifoCache[V]) created(key string, sizeBytes int64) { + if c.ttl <= 0 { + c.cachedValues.Delete(key) + return + } + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached.PushBack(key) + c.cachedBytes += sizeBytes +} + +func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { + if oldSize == newSizeBytes { + return + } + + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cachedBytes += newSizeBytes - oldSize +} + +func (c *fifoCache[V]) contains(k string) bool { + _, ok := c.cachedValues.Load(k) + return ok +} + +type cacheEntryPromise[V any] struct { + ts time.Time + sizeBytes int64 + + done chan struct{} + v V + err error +} + +func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { + ts := ce.ts + r := now.Sub(ts) + return r >= ttl +} + +func MemHashString(str string) uint64 { + return xxhash.Sum64String(str) +} diff --git a/pkg/receive/expandedpostingscache/cache_test.go b/pkg/receive/expandedpostingscache/cache_test.go new file mode 100644 index 0000000000..a2ba2dafd2 --- /dev/null +++ b/pkg/receive/expandedpostingscache/cache_test.go @@ -0,0 +1,168 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original code by Alan Protasio (https://github.com/alanprot) in the Cortex project. +// +//nolint:unparam +package expandedpostingscache + +import ( + "bytes" + "fmt" + "strings" + "sync" + "testing" + "time" + + "go.uber.org/atomic" + + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) + cache := newFifoCache[int]("test", m, time.Now, 10<<20) + calls := atomic.Int64{} + concurrency := 100 + wg := sync.WaitGroup{} + wg.Add(concurrency) + + fetchFunc := func() (int, int64, error) { + calls.Inc() + time.Sleep(100 * time.Millisecond) + return 0, 0, nil //nolint:unparam + } + + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + cache.getPromiseForKey("key1", fetchFunc) + }() + } + + wg.Wait() + require.Equal(t, int64(1), calls.Load()) + +} + +func TestFifoCacheExpire(t *testing.T) { + + keySize := 20 + numberOfKeys := 100 + + tc := map[string]struct { + ttl time.Duration + maxBytes uint64 + expectedFinalItems int + ttlExpire bool + }{ + "MaxBytes": { + expectedFinalItems: 10, + ttl: time.Hour, + maxBytes: uint64(10 * (8 + keySize)), + }, + "TTL": { + expectedFinalItems: numberOfKeys, + ttlExpire: true, + ttl: time.Hour, + maxBytes: 10 << 20, + }, + } + + for name, c := range tc { + t.Run(name, func(t *testing.T) { + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + timeNow := time.Now + cache := newFifoCache[int]("test", m, timeNow, c.maxBytes) + + for i := 0; i < numberOfKeys; i++ { + key := repeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 8, nil + }) + require.False(t, loaded) + require.Equal(t, 1, p.v) + require.True(t, cache.contains(key)) + p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 0, nil + }) + require.True(t, loaded) + require.Equal(t, 1, p.v) + } + + totalCacheSize := 0 + + for i := 0; i < numberOfKeys; i++ { + key := repeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + if cache.contains(key) { + totalCacheSize++ + } + } + + require.Equal(t, c.expectedFinalItems, totalCacheSize) + + if c.expectedFinalItems != numberOfKeys { + err := promtest.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` + # HELP expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted. + # TYPE expanded_postings_cache_evicts_total counter + expanded_postings_cache_evicts_total{cache="test",reason="full"} %v +`, numberOfKeys-c.expectedFinalItems)), "expanded_postings_cache_evicts_total") + require.NoError(t, err) + + } + + if c.ttlExpire { + cache.timeNow = func() time.Time { + return timeNow().Add(2 * c.ttl) + } + + for i := 0; i < numberOfKeys; i++ { + key := repeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + originalSize := cache.cachedBytes + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 2, 18, nil + }) + require.False(t, loaded) + // New value + require.Equal(t, 2, p.v) + // Total Size Updated + require.Equal(t, originalSize+10, cache.cachedBytes) + } + + err := promtest.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` + # HELP expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted. + # TYPE expanded_postings_cache_evicts_total counter + expanded_postings_cache_evicts_total{cache="test",reason="expired"} %v +`, numberOfKeys)), "expanded_postings_cache_evicts_total") + require.NoError(t, err) + + cache.timeNow = func() time.Time { + return timeNow().Add(5 * c.ttl) + } + + cache.getPromiseForKey("newKwy", func() (int, int64, error) { + return 2, 18, nil + }) + + // Should expire all keys again as ttl is expired + err = promtest.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` + # HELP expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted. + # TYPE expanded_postings_cache_evicts_total counter + expanded_postings_cache_evicts_total{cache="test",reason="expired"} %v +`, numberOfKeys*2)), "expanded_postings_cache_evicts_total") + require.NoError(t, err) + } + }) + } +} + +func repeatStringIfNeeded(seed string, length int) string { + if len(seed) > length { + return seed + } + + return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] +} diff --git a/pkg/receive/expandedpostingscache/tsdb.go b/pkg/receive/expandedpostingscache/tsdb.go new file mode 100644 index 0000000000..09d9276575 --- /dev/null +++ b/pkg/receive/expandedpostingscache/tsdb.go @@ -0,0 +1,146 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original code by Alan Protasio (https://github.com/alanprot) in the Cortex project. + +package expandedpostingscache + +import ( + "context" + "errors" + "fmt" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + prom_tsdb "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" +) + +/* + This file is basically a copy from https://github.com/prometheus/prometheus/blob/e2e01c1cffbfc4f26f5e9fe6138af87d7ff16122/tsdb/querier.go + with the difference that the PostingsForMatchers function is called from the Postings Cache +*/ + +type blockBaseQuerier struct { + blockID ulid.ULID + index prom_tsdb.IndexReader + chunks prom_tsdb.ChunkReader + tombstones tombstones.Reader + + closed bool + + mint, maxt int64 +} + +func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) { + indexr, err := b.Index() + if err != nil { + return nil, fmt.Errorf("open index reader: %w", err) + } + chunkr, err := b.Chunks() + if err != nil { + indexr.Close() + return nil, fmt.Errorf("open chunk reader: %w", err) + } + tombsr, err := b.Tombstones() + if err != nil { + indexr.Close() + chunkr.Close() + return nil, fmt.Errorf("open tombstone reader: %w", err) + } + + if tombsr == nil { + tombsr = tombstones.NewMemTombstones() + } + return &blockBaseQuerier{ + blockID: b.Meta().ULID, + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil +} + +func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.SortedLabelValues(ctx, name, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.LabelNames(ctx, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) Close() error { + if q.closed { + return errors.New("block querier already closed") + } + + errs := tsdb_errors.NewMulti( + q.index.Close(), + q.chunks.Close(), + q.tombstones.Close(), + ) + q.closed = true + return errs.Err() +} + +type cachedBlockChunkQuerier struct { + *blockBaseQuerier + + cache ExpandedPostingsCache +} + +func NewCachedBlockChunkQuerier(cache ExpandedPostingsCache, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + q, err := newBlockBaseQuerier(b, mint, maxt) + if err != nil { + return nil, err + } + return &cachedBlockChunkQuerier{blockBaseQuerier: q, cache: cache}, nil +} + +func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt, q.cache) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, ir prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64, + cache ExpandedPostingsCache, +) storage.ChunkSeriesSet { + disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + + if hints != nil { + mint = hints.Start + maxt = hints.End + disableTrimming = hints.DisableTrimming + } + + var postings index.Postings + if cache != nil { + p, err := cache.PostingsForMatchers(ctx, blockID, ir, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + postings = p + } else { + p, err := prom_tsdb.PostingsForMatchers(ctx, ir, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + postings = p + } + + if sharded { + postings = ir.ShardedPostings(postings, hints.ShardIndex, hints.ShardCount) + } + if sortSeries { + postings = ir.SortedPostings(postings) + } + return prom_tsdb.NewBlockChunkSeriesSet(blockID, ir, chunks, tombstones, postings, mint, maxt, disableTrimming) +} diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 26e6284b73..1997e0fd38 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -34,7 +34,9 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/exemplars" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/receive/expandedpostingscache" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -69,6 +71,9 @@ type MultiTSDB struct { exemplarClientsNeedUpdate bool metricNameFilterEnabled bool + + headExpandedPostingsCacheSize uint64 + blockExpandedPostingsCacheSize uint64 } // MultiTSDBOption is a functional option for MultiTSDB. @@ -81,6 +86,18 @@ func WithMetricNameFilterEnabled() MultiTSDBOption { } } +func WithHeadExpandedPostingsCacheSize(size uint64) MultiTSDBOption { + return func(s *MultiTSDB) { + s.headExpandedPostingsCacheSize = size + } +} + +func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption { + return func(s *MultiTSDB) { + s.blockExpandedPostingsCacheSize = size + } +} + // NewMultiTSDB creates new MultiTSDB. // NOTE: Passed labels must be sorted lexicographically (alphabetically). func NewMultiTSDB( @@ -687,9 +704,27 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant dataDir := t.defaultTenantDataDir(tenantID) level.Info(logger).Log("msg", "opening TSDB") + + var expandedPostingsCache expandedpostingscache.ExpandedPostingsCache + if t.headExpandedPostingsCacheSize > 0 || t.blockExpandedPostingsCacheSize > 0 { + var expandedPostingsCacheMetrics = expandedpostingscache.NewPostingCacheMetrics(extprom.WrapRegistererWithPrefix("thanos_", reg)) + + expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize) + } + opts := *t.tsdbOpts opts.BlocksToDelete = tenant.blocksToDelete opts.EnableDelayedCompaction = true + + opts.BlockChunkQuerierFunc = func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + if expandedPostingsCache != nil { + return expandedpostingscache.NewCachedBlockChunkQuerier(expandedPostingsCache, b, mint, maxt) + } + return tsdb.NewBlockChunkQuerier(b, mint, maxt) + } + if expandedPostingsCache != nil { + opts.SeriesLifecycleCallback = expandedPostingsCache + } tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete // NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 0c3e7641ba..7e960845f3 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -5,8 +5,10 @@ package storepb import ( "context" + "fmt" "io" "iter" + "runtime/debug" "google.golang.org/grpc" ) @@ -92,6 +94,12 @@ func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) { var srvIter iter.Seq2[*SeriesResponse, error] = func(yield func(*SeriesResponse, error) bool) { + defer func() { + if r := recover(); r != nil { + st := debug.Stack() + panic(fmt.Sprintf("panic %v in server iterator: %s", r, st)) + } + }() srv := newInProcessServer(ctx, yield) err := s.srv.Series(in, srv) if err != nil { diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62..b4448aa633 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -538,20 +538,21 @@ type ReceiveBuilder struct { f e2e.FutureRunnable - maxExemplars int - capnp bool - ingestion bool - limit int - tenantsLimits receive.TenantsWriteLimitsConfig - metaMonitoring string - metaMonitoringQuery string - hashringConfigs []receive.HashringConfig - relabelConfigs []*relabel.Config - replication int - image string - nativeHistograms bool - labels []string - tenantSplitLabel string + maxExemplars int + capnp bool + ingestion bool + expandedPostingsCache bool + limit int + tenantsLimits receive.TenantsWriteLimitsConfig + metaMonitoring string + metaMonitoringQuery string + hashringConfigs []receive.HashringConfig + relabelConfigs []*relabel.Config + replication int + image string + nativeHistograms bool + labels []string + tenantSplitLabel string } func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { @@ -582,6 +583,11 @@ func (r *ReceiveBuilder) WithIngestionEnabled() *ReceiveBuilder { return r } +func (r *ReceiveBuilder) WithExpandedPostingsCache() *ReceiveBuilder { + r.expandedPostingsCache = true + return r +} + func (r *ReceiveBuilder) WithLabel(name, value string) *ReceiveBuilder { r.labels = append(r.labels, fmt.Sprintf(`%s="%s"`, name, value)) return r @@ -661,6 +667,11 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable { args["--receive.local-endpoint"] = r.InternalEndpoint("grpc") } + if r.expandedPostingsCache { + args["--tsdb.head.expanded-postings-cache-size"] = "1000" + args["--tsdb.block.expanded-postings-cache-size"] = "1000" + } + if r.limit != 0 && r.metaMonitoring != "" { cfg := receive.RootLimitsConfig{ WriteLimits: receive.WriteLimitsConfig{ diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index c938a4f040..7d841e9a7e 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -80,7 +80,7 @@ func TestReceive(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) // Setup Router Ingestor. - i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init() + i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().WithExpandedPostingsCache().Init() testutil.Ok(t, e2e.StartAndWaitReady(i)) // Setup Prometheus @@ -135,9 +135,9 @@ func TestReceive(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) // Setup Receives - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().Init() - r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled().Init() - r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled().Init() + r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().WithExpandedPostingsCache().Init() + r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled().WithExpandedPostingsCache().Init() + r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled().WithExpandedPostingsCache().Init() testutil.Ok(t, e2e.StartAndWaitReady(r1, r2, r3)) @@ -291,9 +291,9 @@ test_metric{a="2", b="2"} 1`) t.Cleanup(e2ethanos.CleanScenario(t, e)) // Setup 3 ingestors. - i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init() - i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().Init() - i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init() + i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().WithExpandedPostingsCache().Init() + i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().WithExpandedPostingsCache().Init() + i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().WithExpandedPostingsCache().Init() h := receive.HashringConfig{ Endpoints: []receive.Endpoint{