From 78a9e351d95d29da096422caa3e4afb5e4a8cea1 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 20 Nov 2024 14:30:25 -0800 Subject: [PATCH] Upgrade thanos version to fix store gateway buf reuse issue (#6346) * upgrade thanos version to fix store gateway buf reuse issue Signed-off-by: Ben Ye * change to always flush old when resolving DNS records Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- go.mod | 2 +- go.sum | 4 +- pkg/chunk/cache/memcached_client.go | 2 +- pkg/compactor/compactor.go | 1 + pkg/querier/blocks_store_balanced_set.go | 2 +- pkg/ring/bench/ring_memberlist_test.go | 2 +- pkg/ring/kv/memberlist/dnsprovider.go | 2 +- pkg/ring/kv/memberlist/memberlist_client.go | 2 +- .../kv/memberlist/memberlist_client_test.go | 2 +- .../thanos-io/thanos/pkg/cache/groupcache.go | 2 +- .../thanos/pkg/cacheutil/memcached_client.go | 4 +- .../thanos-io/thanos/pkg/clientconfig/http.go | 4 +- .../thanos-io/thanos/pkg/compact/compact.go | 21 ++++++-- .../thanos/pkg/discovery/dns/grpc.go | 36 +++++++++---- .../thanos/pkg/discovery/dns/provider.go | 15 +++--- .../thanos/pkg/discovery/memcache/provider.go | 17 ++++--- .../thanos-io/thanos/pkg/extpromql/parser.go | 11 +--- .../thanos/pkg/promclient/promclient.go | 51 +++++++++++++++++-- .../thanos/pkg/query/remote_engine.go | 3 -- .../thanos-io/thanos/pkg/store/bucket.go | 13 ++--- vendor/modules.txt | 2 +- 21 files changed, 132 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index 9e1eaed558..f28daee44b 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 - github.com/thanos-io/thanos v0.35.2-0.20241029125830-62038110b1bc + github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index 83dedf83f6..7cde3621fd 100644 --- a/go.sum +++ b/go.sum @@ -1687,8 +1687,8 @@ github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqX github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4= github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g+3EPohdw4cv+6jv5+LcX6LynhHvQcYwTAMxQ= github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= -github.com/thanos-io/thanos v0.35.2-0.20241029125830-62038110b1bc h1:AoIDFFb3xjED+mmf5g6bnULDgNJWeXtKyjAQ1CKtqfk= -github.com/thanos-io/thanos v0.35.2-0.20241029125830-62038110b1bc/go.mod h1:6Q6xfe4mUYCYbrixod2dNZqKCK4J9pGRKqF29/cUr/A= +github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 h1:khQQmo9VMMphKm10NS22iI8bAIf4DNRFvLoP4IR7kK4= +github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171/go.mod h1:v2AUVRd44iuv/PY0iwzKzgosLusUg7/dnnw5KEe1Has= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= diff --git a/pkg/chunk/cache/memcached_client.go b/pkg/chunk/cache/memcached_client.go index 7d543f822a..39a60f27af 100644 --- a/pkg/chunk/cache/memcached_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -243,7 +243,7 @@ func (c *memcachedClient) updateMemcacheServers() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := c.provider.Resolve(ctx, c.addresses); err != nil { + if err := c.provider.Resolve(ctx, c.addresses, true); err != nil { return err } servers = c.provider.Addresses() diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 817f93572b..5345bd9076 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -918,6 +918,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { fetcher, deduplicateBlocksFilter, ignoreDeletionMarkFilter, + c.compactorCfg.CompactionInterval, ) if err != nil { return errors.Wrap(err, "failed to create syncer") diff --git a/pkg/querier/blocks_store_balanced_set.go b/pkg/querier/blocks_store_balanced_set.go index 44b52eff8c..4b50f75120 100644 --- a/pkg/querier/blocks_store_balanced_set.go +++ b/pkg/querier/blocks_store_balanced_set.go @@ -54,7 +54,7 @@ func (s *blocksStoreBalancedSet) starting(ctx context.Context) error { } func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error { - if err := s.dnsProvider.Resolve(ctx, s.serviceAddresses); err != nil { + if err := s.dnsProvider.Resolve(ctx, s.serviceAddresses, true); err != nil { level.Error(s.logger).Log("msg", "failed to resolve store-gateway addresses", "err", err, "addresses", s.serviceAddresses) } return nil diff --git a/pkg/ring/bench/ring_memberlist_test.go b/pkg/ring/bench/ring_memberlist_test.go index 085a4d3107..1366c47aa4 100644 --- a/pkg/ring/bench/ring_memberlist_test.go +++ b/pkg/ring/bench/ring_memberlist_test.go @@ -21,7 +21,7 @@ type dnsProviderMock struct { resolved []string } -func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error { +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string, flushOld bool) error { p.resolved = addrs return nil } diff --git a/pkg/ring/kv/memberlist/dnsprovider.go b/pkg/ring/kv/memberlist/dnsprovider.go index b51a5d0553..2f98a91c93 100644 --- a/pkg/ring/kv/memberlist/dnsprovider.go +++ b/pkg/ring/kv/memberlist/dnsprovider.go @@ -8,7 +8,7 @@ import ( type DNSProvider interface { // Resolve stores a list of provided addresses or their DNS records if requested. // Implementations may have specific ways of interpreting addresses. - Resolve(ctx context.Context, addrs []string) error + Resolve(ctx context.Context, addrs []string, flushOld bool) error // Addresses returns the latest addresses present in the DNSProvider. Addresses() []string diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 59a828e48f..69f3bfd5ba 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -596,7 +596,7 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string { } } - err := m.provider.Resolve(ctx, resolve) + err := m.provider.Resolve(ctx, resolve, true) if err != nil { level.Error(m.logger).Log("msg", "failed to resolve members", "addrs", strings.Join(resolve, ",")) } diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index d94e88e5f5..6c33b85441 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -1350,7 +1350,7 @@ type dnsProviderMock struct { resolved []string } -func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error { +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string, flushOld bool) error { p.resolved = addrs return nil } diff --git a/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go b/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go index 609959d0cb..43bf1aeefe 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go @@ -134,7 +134,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf go func() { for { - if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers); err != nil { + if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for groupcache", "err", err) } else { err := universe.Set(dnsGroupcacheProvider.Addresses()...) diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go index a5b0c5b2a4..e700ffee7b 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go @@ -211,7 +211,7 @@ type memcachedClient struct { // AddressProvider performs node address resolution given a list of clusters. type AddressProvider interface { // Resolves the provided list of memcached cluster to the actual nodes - Resolve(context.Context, []string) error + Resolve(context.Context, []string, bool) error // Returns the nodes Addresses() []string @@ -638,7 +638,7 @@ func (c *memcachedClient) resolveAddrs() error { defer cancel() // If some of the dns resolution fails, log the error. - if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil { + if err := c.addressProvider.Resolve(ctx, c.config.Addresses, true); err != nil { level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. diff --git a/vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go index 69f2baf165..4493ebc60b 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go +++ b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go @@ -330,7 +330,7 @@ func (c HTTPFileSDConfig) convert() (file.SDConfig, error) { } type AddressProvider interface { - Resolve(context.Context, []string) error + Resolve(context.Context, []string, bool) error Addresses() []string } @@ -433,5 +433,5 @@ func (c *HTTPClient) Discover(ctx context.Context) { // Resolve refreshes and resolves the list of targets. func (c *HTTPClient) Resolve(ctx context.Context) error { - return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...)) + return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...), true) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go index 7f08297671..a20544b2f4 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go @@ -61,6 +61,7 @@ type Syncer struct { metrics *SyncerMetrics duplicateBlocksFilter block.DeduplicateFilter ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter + syncMetasTimeout time.Duration g singleflight.Group } @@ -101,15 +102,23 @@ func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { - return NewMetaSyncerWithMetrics(logger, NewSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks), bkt, fetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter) +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter, syncMetasTimeout time.Duration) (*Syncer, error) { + return NewMetaSyncerWithMetrics(logger, + NewSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks), + bkt, + fetcher, + duplicateBlocksFilter, + ignoreDeletionMarkFilter, + syncMetasTimeout, + ) } -func NewMetaSyncerWithMetrics(logger log.Logger, metrics *SyncerMetrics, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter) (*Syncer, error) { +func NewMetaSyncerWithMetrics(logger log.Logger, metrics *SyncerMetrics, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, syncMetasTimeout time.Duration) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ + syncMetasTimeout: syncMetasTimeout, logger: logger, bkt: bkt, fetcher: fetcher, @@ -138,6 +147,12 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { // SyncMetas synchronizes local state of block metas with what we have in the bucket. func (s *Syncer) SyncMetas(ctx context.Context) error { + var cancel func() = func() {} + if s.syncMetasTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, s.syncMetasTimeout) + } + defer cancel() + type metasContainer struct { metas map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error diff --git a/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go b/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go index 4e315596df..79e832b652 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go +++ b/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/go-kit/log" grpcresolver "google.golang.org/grpc/resolver" ) @@ -19,12 +20,14 @@ var ( type builder struct { resolveInterval time.Duration provider *Provider + logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration) { +func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, + logger: logger, }) } @@ -39,6 +42,7 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp cancel: cancel, cc: cc, interval: b.resolveInterval, + logger: b.logger, } r.wg.Add(1) go r.run() @@ -55,7 +59,8 @@ type resolver struct { cc grpcresolver.ClientConn interval time.Duration - wg sync.WaitGroup + wg sync.WaitGroup + logger log.Logger } func (r *resolver) Close() { @@ -68,7 +73,7 @@ func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {} func (r *resolver) resolve() error { ctx, cancel := context.WithTimeout(r.ctx, r.interval) defer cancel() - return r.provider.Resolve(ctx, []string{r.target}) + return r.provider.Resolve(ctx, []string{r.target}, false) } func (r *resolver) addresses() []string { @@ -78,16 +83,25 @@ func (r *resolver) addresses() []string { func (r *resolver) run() { defer r.wg.Done() for { - if err := r.resolve(); err != nil { - r.cc.ReportError(err) - } else { + func() { + if err := r.resolve(); err != nil { + r.cc.ReportError(err) + r.logger.Log("msg", "failed to resolve", "err", err) + return + } state := grpcresolver.State{} - for _, addr := range r.addresses() { - raddr := grpcresolver.Address{Addr: addr} - state.Addresses = append(state.Addresses, raddr) + addrs := r.addresses() + if len(addrs) == 0 { + r.logger.Log("msg", "no addresses resolved", "target", r.target) + return } - _ = r.cc.UpdateState(state) - } + for _, addr := range addrs { + state.Addresses = append(state.Addresses, grpcresolver.Address{Addr: addr}) + } + if err := r.cc.UpdateState(state); err != nil { + r.logger.Log("msg", "failed to update state", "err", err) + } + }() select { case <-r.ctx.Done(): return diff --git a/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/provider.go b/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/provider.go index 8f42bf4d26..f9c1c7f583 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/provider.go +++ b/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/provider.go @@ -110,7 +110,7 @@ func GetQTypeName(addr string) (qtype, name string) { // Resolve stores a list of provided addresses or their DNS records if requested. // Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV). // For non-SRV records, it will return an error if a port is not supplied. -func (p *Provider) Resolve(ctx context.Context, addrs []string) error { +func (p *Provider) Resolve(ctx context.Context, addrs []string, flushOld bool) error { resolvedAddrs := map[string][]string{} errs := errutil.MultiError{} @@ -129,10 +129,7 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error { errs.Add(err) // The DNS resolution failed. Continue without modifying the old records. p.resolverFailuresCount.Inc() - // Use cached values. - p.RLock() - resolved = p.resolved[addr] - p.RUnlock() + continue } resolvedAddrs[addr] = resolved } @@ -143,13 +140,17 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error { defer p.Unlock() p.resolverAddrs.ResetTx() + if flushOld && len(errs) == 0 { + p.resolved = map[string][]string{} + } for name, addrs := range resolvedAddrs { + p.resolved[name] = addrs + } + for name, addrs := range p.resolved { p.resolverAddrs.WithLabelValues(name).Set(float64(len(addrs))) } p.resolverAddrs.Submit() - p.resolved = resolvedAddrs - return errs.Err() } diff --git a/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go b/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go index 0456a52bf4..1560c55951 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go +++ b/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go @@ -58,7 +58,7 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time. } // Resolve stores a list of nodes auto-discovered from the provided addresses. -func (p *Provider) Resolve(ctx context.Context, addresses []string) error { +func (p *Provider) Resolve(ctx context.Context, addresses []string, flushOld bool) error { clusterConfigs := map[string]*clusterConfig{} errs := errutil.MultiError{} @@ -74,13 +74,9 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error { errs.Add(err) p.resolverFailuresCount.Inc() - // Use cached values. - p.RLock() - clusterConfigs[address] = p.clusterConfigs[address] - p.RUnlock() - } else { - clusterConfigs[address] = clusterConfig + continue } + clusterConfigs[address] = clusterConfig } p.Lock() @@ -95,7 +91,12 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error { p.resolvedAddresses.Submit() p.configVersion.Submit() - p.clusterConfigs = clusterConfigs + if flushOld && len(errs) == 0 { + p.clusterConfigs = map[string]*clusterConfig{} + } + for addr, config := range clusterConfigs { + p.clusterConfigs[addr] = config + } return errs.Err() } diff --git a/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go b/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go index 43d7188fdc..a4e92e9c55 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go +++ b/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go @@ -43,16 +43,7 @@ func ParseMetricSelector(input string) ([]*labels.Matcher, error) { return nil, fmt.Errorf("expected type *parser.VectorSelector, got %T", expr) } - matchers := make([]*labels.Matcher, len(vs.LabelMatchers)) - for i, lm := range vs.LabelMatchers { - matchers[i] = &labels.Matcher{ - Type: lm.Type, - Name: lm.Name, - Value: lm.Value, - } - } - - return matchers, nil + return vs.LabelMatchers, nil } func isEmptyNameMatcherErr(err error) bool { diff --git a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go index 1f96f4c666..eeca44db92 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go +++ b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "os" @@ -24,8 +25,8 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/pkg/errors" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" @@ -199,13 +200,15 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe return labels.EmptyLabels(), errors.Wrapf(err, "unmarshal response: %v", string(body)) } var cfg struct { - GlobalConfig config.GlobalConfig `yaml:"global"` + GlobalConfig struct { + ExternalLabels map[string]string `yaml:"external_labels"` + } `yaml:"global"` } if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil { return labels.EmptyLabels(), errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML) } - return cfg.GlobalConfig.ExternalLabels, nil + return labels.FromMap(cfg.GlobalConfig.ExternalLabels), nil } type Flags struct { @@ -687,6 +690,48 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error return b.Data.Version, nil } +// LowestTimestamp returns the lowest timestamp in the TSDB by parsing the /metrics endpoint +// and extracting the prometheus_tsdb_lowest_timestamp_seconds metric from it. +func (c *Client) LowestTimestamp(ctx context.Context, base *url.URL) (int64, error) { + u := *base + u.Path = path.Join(u.Path, "/metrics") + + level.Debug(c.logger).Log("msg", "lowest timestamp", "url", u.String()) + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return 0, errors.Wrap(err, "create request") + } + + span, ctx := tracing.StartSpan(ctx, "/lowest_timestamp HTTP[client]") + defer span.Finish() + + resp, err := c.Do(req.WithContext(ctx)) + if err != nil { + return 0, errors.Wrapf(err, "request metric against %s", u.String()) + } + defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "request body") + + var parser expfmt.TextParser + families, err := parser.TextToMetricFamilies(resp.Body) + if err != nil { + return 0, errors.Wrapf(err, "parsing metric families against %s", u.String()) + } + mf, ok := families["prometheus_tsdb_lowest_timestamp_seconds"] + if !ok { + return 0, errors.Wrapf(err, "metric families did not contain 'prometheus_tsdb_lowest_timestamp_seconds'") + } + val := 1000 * mf.GetMetric()[0].GetGauge().GetValue() + + // in the case that we dont have cut a block yet, TSDB lowest timestamp is math.MaxInt64 + // but its represented as float and truncated so we need to do this weird comparison. + // Since we use this for fan-out pruning we use min timestamp here to include this prometheus. + if val == float64(math.MaxInt64) { + return math.MinInt64, nil + } + return int64(val), nil +} + func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go b/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go index c625cab5ba..77a74c9a6f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go +++ b/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go @@ -126,12 +126,10 @@ func (r *remoteEngine) MinT() int64 { highestMintByLabelSet[key] = lset.MinTime continue } - if lset.MinTime > lsetMinT { highestMintByLabelSet[key] = lset.MinTime } } - var mint int64 = math.MaxInt64 for _, m := range highestMintByLabelSet { if m < mint { @@ -190,7 +188,6 @@ func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos { labelpb.ZLabelsFromPromLabels(builder.Labels())), ) } - return infos } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 7ab47f4cbe..d9940221ff 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -922,17 +922,15 @@ func (s *BucketStore) TSDBInfos() []infopb.TSDBInfo { sort.Slice(infos, func(i, j int) bool { return infos[i].MinTime < infos[j].MinTime }) cur := infos[0] - for i, info := range infos { + for _, info := range infos { if info.MinTime > cur.MaxTime { res = append(res, cur) cur = info continue } cur.MaxTime = info.MaxTime - if i == len(infos)-1 { - res = append(res, cur) - } } + res = append(res, cur) } return res @@ -1574,6 +1572,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store tenant, ) + defer blockClient.Close() + g.Go(func() error { span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ @@ -1688,10 +1688,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.seriesBlocksQueried.WithLabelValues(tenant).Observe(float64(stats.blocksQueried)) } + lt := NewProxyResponseLoserTree(respSets...) + defer lt.Close() // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { begin := time.Now() - set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...)) + set := NewResponseDeduplicator(lt) i := 0 for set.Next() { i++ @@ -3379,7 +3381,6 @@ func (r *bucketIndexReader) Close() error { } func (b *blockSeriesClient) CloseSend() error { - b.Close() return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index d7282384d8..472fff10b4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -982,7 +982,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.35.2-0.20241029125830-62038110b1bc +# github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 ## explicit; go 1.23.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block