diff --git a/metrics/serve.go b/metrics/serve.go index 7bff8d8..835f9e0 100644 --- a/metrics/serve.go +++ b/metrics/serve.go @@ -517,6 +517,12 @@ func (c *Collector) Run() error { go c.handleSeriesTicks(&mutableState.seriesCycle, unsafeReadOnlyGetState) go c.handleMetricTicks(&mutableState.metricCycle, unsafeReadOnlyGetState) + // Mark best-effort update, so remote write knows (if enabled). + select { + case c.updateNotifyCh <- struct{}{}: + default: + } + <-c.stopCh return nil } diff --git a/metrics/write.go b/metrics/write.go index 9e15a6c..9cecd02 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -110,6 +110,13 @@ func cloneRequest(r *http.Request) *http.Request { } func (c *Client) write(ctx context.Context) error { + select { + // Wait for update first as write and collector.Run runs simultaneously. + case <-c.config.UpdateNotify: + case <-ctx.Done(): + return ctx.Err() + } + tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) if err != nil { return err @@ -125,25 +132,39 @@ func (c *Client) write(ctx context.Context) error { merr = &errors.MultiError{} ) - log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + shouldRunForever := c.config.RequestCount == -1 + if shouldRunForever { + log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.BatchSize, c.config.RequestInterval) + } else { + log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + } + ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() - left := c.config.RequestCount // left equal to -1 means infinite amount of requests. - for left != 0 { - left-- + + for i := 0; ; { if ctx.Err() != nil { return ctx.Err() } - // Download the pprofs during half of the iteration to get avarege readings. - // Do that only when it is not set to take profiles at a given interval. - if len(c.config.PprofURLs) > 0 && c.config.RequestCount != -1 && left == c.config.RequestCount/2 { - wgPprof.Add(1) - go func() { - download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05")) - wgPprof.Done() - }() + if !shouldRunForever { + if i < c.config.RequestCount { + break + } + i++ + // Download the pprofs during half of the iteration to get avarege readings. + // Do that only when it is not set to take profiles at a given interval. + if len(c.config.PprofURLs) > 0 && i == c.config.RequestCount/2 { + wgPprof.Add(1) + go func() { + download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05")) + wgPprof.Done() + }() + } } + <-ticker.C select { case <-c.config.UpdateNotify: @@ -168,8 +189,7 @@ func (c *Client) write(ctx context.Context) error { req := &prompb.WriteRequest{ Timeseries: tss[i:end], } - err := c.Store(context.TODO(), req) - if err != nil { + if err := c.Store(context.TODO(), req); err != nil { merr.Add(err) return }