Skip to content

Commit

Permalink
Merge pull request juju#17342 from SimonRichardson/improve-changestre…
Browse files Browse the repository at this point in the history
…am-prunner-log-output

juju#17342

The changestream pruner will log out if the changestream isn't keeping up with all the terms in the changestream. Unfortunately, if there aren't any writes going to the changestream it will still message out that it isn't keeping up. This isn't actually true. There just isn't enough changes being added to the changestream for the log message to be useful. Refactoring it to only log out if the window of changes has moved will then surface the real log messages.

<!-- Why this change is needed and what it does. -->

## Checklist

<!-- If an item is not applicable, use `~strikethrough~`. -->

- [x] Code style: imports ordered, good names, simple structure, etc
- [x] Comments saying why design decisions were made
- [x] Go unit tests, with comments saying what you're testing

## QA steps

```sh
$ juju bootstrap lxd test
$ juju debug-log -m controller
```

The warning log messages should no longer be emitted.


## Links

**Jira card:** JUJU-
  • Loading branch information
jujubot authored May 9, 2024
2 parents 4fb706d + 727ab70 commit 6cfce0f
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 135 deletions.
6 changes: 5 additions & 1 deletion internal/worker/changestreampruner/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func (s *baseSuite) expectControllerDBGet() {
}

func (s *baseSuite) expectDBGet(namespace string, txnRunner coredatabase.TxnRunner) {
s.dbGetter.EXPECT().GetDB(namespace).Return(txnRunner, nil)
s.expectDBGetTimes(namespace, txnRunner, 1)
}

func (s *baseSuite) expectDBGetTimes(namespace string, txnRunner coredatabase.TxnRunner, amount int) {
s.dbGetter.EXPECT().GetDB(namespace).Return(txnRunner, nil).Times(amount)
}

func (s *baseSuite) expectAnyLogs(c *gc.C) {
Expand Down
180 changes: 88 additions & 92 deletions internal/worker/changestreampruner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package changestreampruner

import (
"context"
"math"
"sort"
"time"

"github.com/canonical/sqlair"
Expand All @@ -14,7 +14,6 @@ import (
"github.com/juju/retry"
"gopkg.in/tomb.v2"

"github.com/juju/juju/core/changestream"
coredatabase "github.com/juju/juju/core/database"
)

Expand Down Expand Up @@ -69,6 +68,11 @@ type Pruner struct {
tomb tomb.Tomb

cfg WorkerConfig

// windows holds the last window for each namespace. This is used to
// determine if the change stream is keeping up with the pruner. If the
// watermark is outside of the window, we should log a warning message.
windows map[string]window
}

// New creates a new Pruner.
Expand All @@ -79,7 +83,8 @@ func newWorker(cfg WorkerConfig) (*Pruner, error) {
}

pruner := &Pruner{
cfg: cfg,
cfg: cfg,
windows: make(map[string]window),
}

pruner.tomb.Go(pruner.loop)
Expand Down Expand Up @@ -129,16 +134,6 @@ func (w *Pruner) loop() error {
}
}

type window struct {
start, end time.Time
}

// contains returns true if the window contains the given time.
// Note: This assumes there isn't any clock drift!
func (w *window) contains(t time.Time) bool {
return t.Compare(w.start) >= 0 && t.Compare(w.end) <= 0
}

func (w *Pruner) prune() (map[string]int64, error) {
traceEnabled := w.cfg.Logger.IsTraceEnabled()
if traceEnabled {
Expand Down Expand Up @@ -176,9 +171,15 @@ func (w *Pruner) prune() (map[string]int64, error) {
{Namespace: coredatabase.ControllerNS},
}, modelNamespaces...)

// Prune each and every model found in the model list. This
// Prune each and every model found in the model list.
pruned := make(map[string]int64)
modelNames := make(map[string]struct{})
for _, mn := range modelNamespaces {
// Store the model name in a map. We can't use pruned map for the
// tracking of namespaces, because if there is an error we might
// accidentally remove a window for a model that hasn't been deleted.
modelNames[mn.Namespace] = struct{}{}

p, err := w.pruneModel(ctx, mn.Namespace)
if err != nil {
// If there is an error, continue on to the next model, as we don't
Expand All @@ -194,6 +195,15 @@ func (w *Pruner) prune() (map[string]int64, error) {
pruned[mn.Namespace] = p
}

// Ensure we clean up the windows for any models that have been deleted.
// The absence of a model in the modelNames list indicates that the model
// has been deleted and we should remove the window.
for namespace := range w.windows {
if _, ok := modelNames[namespace]; !ok {
delete(w.windows, namespace)
}
}

if traceEnabled {
w.cfg.Logger.Tracef("Finished pruning change log")
}
Expand Down Expand Up @@ -223,19 +233,7 @@ func (w *Pruner) pruneModel(ctx context.Context, namespace string) (int64, error
return pruned, errors.Trace(err)
}

// ChangeLog represents a row from the change_log table.
type ChangeLog struct {
ID int `db:"id"`
}

var (
selectWitnessQuery = sqlair.MustPrepare(`SELECT (controller_id, lower_bound, updated_at) AS (&Watermark.*) FROM change_log_witness;`, Watermark{})

// TODO (stickupkid): This needs to be swapped out for the following query
// once we have a way to use functions in columns.
// SELECT COUNT(*) AS &Result.count FROM change_log WHERE created_at > $M.created_at LIMIT $M.limit;
selectChangeLogQuery = sqlair.MustPrepare(`SELECT id AS &ChangeLog.id FROM change_log WHERE created_at > $M.created_at LIMIT $M.limit;`, ChangeLog{}, sqlair.M{})
)
var selectWitnessQuery = sqlair.MustPrepare(`SELECT (controller_id, lower_bound, updated_at) AS (&Watermark.*) FROM change_log_witness;`, Watermark{})

func (w *Pruner) locateLowestWatermark(ctx context.Context, tx *sqlair.TX, namespace string) (Watermark, error) {
// Gather all the valid watermarks, post row pruning. These include
Expand All @@ -254,28 +252,32 @@ func (w *Pruner) locateLowestWatermark(ctx context.Context, tx *sqlair.TX, names
// Gather all the watermarks that are within the windowed time period.
// If there are no watermarks within the window, then we can assume
// that the stream is keeping up and we don't need to prune anything.
lowest, ok := w.lowestWatermark(namespace, watermarks, w.cfg.Clock.Now())
if !ok {
// Check to see if the latest change log has a valid log in the last
// window duration, if not, then we can assume that the stream is not
// keeping up and we should log a warning.
var changes []ChangeLog
if err := tx.Query(ctx, selectChangeLogQuery, sqlair.M{
"created_at": w.cfg.Clock.Now().Add(-defaultWindowDuration),
"limit": changestream.DefaultNumTermWatermarks + 1,
}).GetAll(&changes); err != nil && !errors.Is(err, sqlair.ErrNoRows) {
return Watermark{}, errors.Trace(err)
}
// If there are less than the default number of term watermarks, then
// we can assume that the stream is keeping up and we don't need to
// prune anything.
if len(changes) < changestream.DefaultNumTermWatermarks {
return Watermark{}, nil
}
w.cfg.Logger.Warningf("No watermarks within window, check logs to see if the change stream is keeping up")
return Watermark{}, nil
sorted := sortWatermarks(namespace, watermarks)

// Find the first and last watermark in the sorted list, this is our
// window. It should hold the start and the end of the window.
watermarkView := window{
start: sorted[0].UpdatedAt,
end: sorted[len(sorted)-1].UpdatedAt,
}
return lowest, nil

// If the watermark is outside of the window, we should log a warning
// message to indicate that the change stream is not keeping up. Only if
// the watermark is different from the last window, as we don't want to
// spam the logs if there are no changes.
now := w.cfg.Clock.Now()
timeView := window{
start: now.Add(-defaultWindowDuration),
end: now,
}
if !timeView.Contains(watermarkView) && !watermarkView.Equals(w.windows[namespace]) {
w.cfg.Logger.Warningf("namespace %s watermarks %q are outside of window, check logs to see if the change stream is keeping up", namespace, sorted[0].ControllerID)
}

// Save the last window for the next iteration.
w.windows[namespace] = watermarkView

return sorted[0], nil
}

var deleteQuery = sqlair.MustPrepare(`DELETE FROM change_log WHERE id <= $M.id;`, sqlair.M{})
Expand All @@ -290,50 +292,6 @@ func (w *Pruner) deleteChangeLog(ctx context.Context, tx *sqlair.TX, lowest Wate
return pruned, errors.Trace(err)
}

func (w *Pruner) lowestWatermark(namespace string, watermarks []Watermark, now time.Time) (Watermark, bool) {
// Select the lower bound of the watermark, only if the updated_at time
// is within a windowed time period.
var (
view = window{
start: now.Add(-defaultWindowDuration),
end: now,
}
lowest = Watermark{
LowerBound: math.MaxInt64,
}
)
for _, watermark := range watermarks {
// TODO (stickupkid): If any watermarks are outside of the window, then
// we should force the falling behind controller to bounce and to try
// again at keeping up. We don't have any mechanism to do this yet,
// adding a item to table might be a waste of time. If the controller
// isn't inserting into the change log, they're either bouncing all the
// time or are deadlocked. If they're the latter no amount of inserting
// into a table will solve the problem, as they're not reading the
// change log anyway.
// In addition to this, we have no theatre experience about what a
// good valid window time is. For now we'll just log a warning for
// visibility, before we solidify the approach.
if !view.contains(watermark.UpdatedAt) {
w.cfg.Logger.Warningf("namespace %s watermarks %q are outside of window, check logs to see if the change stream is keeping up", namespace, watermark.ControllerID)
}

// Select the lower bound of the watermark.
if watermark.LowerBound < lowest.LowerBound {
lowest = watermark
}
}

// Nothing was selected, this means that there are no watermarks within
// the windowed time period. It could also mean that potentially there are
// are now controllers not keeping up or recording their changes.
if lowest.LowerBound == math.MaxInt64 {
return Watermark{}, false
}

return lowest, true
}

// scopedContext returns a context that is in the scope of the worker lifetime.
// It returns a cancellable context that is cancelled when the action has
// completed.
Expand All @@ -342,6 +300,23 @@ func (w *Pruner) scopedContext() (context.Context, context.CancelFunc) {
return w.tomb.Context(ctx), cancel
}

func sortWatermarks(namespace string, watermarks []Watermark) []Watermark {
// If there is only one watermark, just use that one and return out early.
if len(watermarks) == 1 {
return watermarks
}

// Sort the watermarks by the lower bound.
sort.Slice(watermarks, func(i, j int) bool {
if watermarks[i].LowerBound == watermarks[j].LowerBound {
return watermarks[i].UpdatedAt.Before(watermarks[j].UpdatedAt)
}
return watermarks[i].LowerBound < watermarks[j].LowerBound
})

return watermarks
}

// ModelNamespace represents a model and the associated DQlite namespace that it
// uses.
type ModelNamespace struct {
Expand All @@ -355,3 +330,24 @@ type Watermark struct {
LowerBound int64 `db:"lower_bound"`
UpdatedAt time.Time `db:"updated_at"`
}

type window struct {
start, end time.Time
}

// Contains returns true if the window contains the given time.
func (w window) Contains(o window) bool {
if w.Equals(o) {
return true
}
return w.start.Before(o.start) && w.end.After(o.end)
}

// Equals returns true if the window is equal to the given window.
func (w window) Equals(o window) bool {
return w.start.Equal(o.start) && w.end.Equal(o.end)
}

func (w window) String() string {
return w.start.Format(time.RFC3339) + " -> " + w.end.Format(time.RFC3339)
}
Loading

0 comments on commit 6cfce0f

Please sign in to comment.