Skip to content

Commit

Permalink
fix: Fixed race during update, improved infinite run logging and read…
Browse files Browse the repository at this point in the history
…ability.

Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka committed Sep 27, 2024
1 parent ab115e0 commit 9abed95
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
6 changes: 6 additions & 0 deletions metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ func (c *Collector) Run() error {
go c.handleSeriesTicks(&mutableState.seriesCycle, unsafeReadOnlyGetState)
go c.handleMetricTicks(&mutableState.metricCycle, unsafeReadOnlyGetState)

// Mark best-effort update, so remote write knows (if enabled).
select {
case c.updateNotifyCh <- struct{}{}:
default:
}

<-c.stopCh
return nil
}
Expand Down
48 changes: 34 additions & 14 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func cloneRequest(r *http.Request) *http.Request {
}

func (c *Client) write(ctx context.Context) error {
select {
// Wait for update first as write and collector.Run runs simultaneously.
case <-c.config.UpdateNotify:
case <-ctx.Done():
return ctx.Err()
}

tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder)
if err != nil {
return err
Expand All @@ -125,25 +132,39 @@ func (c *Client) write(ctx context.Context) error {
merr = &errors.MultiError{}
)

log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
shouldRunForever := c.config.RequestCount == -1
if shouldRunForever {
log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n",
len(tss), c.config.BatchSize, c.config.RequestInterval)
} else {
log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n",
len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
}

ticker := time.NewTicker(c.config.RequestInterval)
defer ticker.Stop()
left := c.config.RequestCount // left equal to -1 means infinite amount of requests.
for left != 0 {
left--

for i := 0; ; {
if ctx.Err() != nil {
return ctx.Err()
}

// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
if len(c.config.PprofURLs) > 0 && c.config.RequestCount != -1 && left == c.config.RequestCount/2 {
wgPprof.Add(1)
go func() {
download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05"))
wgPprof.Done()
}()
if !shouldRunForever {
if i < c.config.RequestCount {
break
}
i++
// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
if len(c.config.PprofURLs) > 0 && i == c.config.RequestCount/2 {
wgPprof.Add(1)
go func() {
download.URLs(c.config.PprofURLs, time.Now().Format("2-Jan-2006-15:04:05"))
wgPprof.Done()
}()
}
}

<-ticker.C
select {
case <-c.config.UpdateNotify:
Expand All @@ -168,8 +189,7 @@ func (c *Client) write(ctx context.Context) error {
req := &prompb.WriteRequest{
Timeseries: tss[i:end],
}
err := c.Store(context.TODO(), req)
if err != nil {
if err := c.Store(context.TODO(), req); err != nil {
merr.Add(err)
return
}
Expand Down

0 comments on commit 9abed95

Please sign in to comment.