From 727ab70dc720392c1e1f6009d4ecba2f10571cd0 Mon Sep 17 00:00:00 2001 From: Simon Richardson Date: Mon, 6 May 2024 14:33:26 +0100 Subject: [PATCH] Improve changestream pruner log output The changestream pruner will log out if the changestream isn't keeping up with all the terms in the changestream. Unfortunately, if there aren't any writes going to the changestream it will still message out that it isn't keeping up. This isn't actually true. There just isn't enough changes being added to the changestream for the log message to be useful. Refactoring it to only log out if the window of changes has moved will then surface the real log messages. --- .../worker/changestreampruner/package_test.go | 6 +- internal/worker/changestreampruner/worker.go | 180 +++++++++--------- .../worker/changestreampruner/worker_test.go | 161 ++++++++++++---- 3 files changed, 212 insertions(+), 135 deletions(-) diff --git a/internal/worker/changestreampruner/package_test.go b/internal/worker/changestreampruner/package_test.go index 0bc00883fab..f9ddb208590 100644 --- a/internal/worker/changestreampruner/package_test.go +++ b/internal/worker/changestreampruner/package_test.go @@ -71,7 +71,11 @@ func (s *baseSuite) expectControllerDBGet() { } func (s *baseSuite) expectDBGet(namespace string, txnRunner coredatabase.TxnRunner) { - s.dbGetter.EXPECT().GetDB(namespace).Return(txnRunner, nil) + s.expectDBGetTimes(namespace, txnRunner, 1) +} + +func (s *baseSuite) expectDBGetTimes(namespace string, txnRunner coredatabase.TxnRunner, amount int) { + s.dbGetter.EXPECT().GetDB(namespace).Return(txnRunner, nil).Times(amount) } func (s *baseSuite) expectAnyLogs(c *gc.C) { diff --git a/internal/worker/changestreampruner/worker.go b/internal/worker/changestreampruner/worker.go index a4054f242d7..2780f05b2c0 100644 --- a/internal/worker/changestreampruner/worker.go +++ b/internal/worker/changestreampruner/worker.go @@ -5,7 +5,7 @@ package changestreampruner import ( "context" - "math" + "sort" "time" "github.com/canonical/sqlair" @@ -14,7 +14,6 @@ import ( "github.com/juju/retry" "gopkg.in/tomb.v2" - "github.com/juju/juju/core/changestream" coredatabase "github.com/juju/juju/core/database" ) @@ -69,6 +68,11 @@ type Pruner struct { tomb tomb.Tomb cfg WorkerConfig + + // windows holds the last window for each namespace. This is used to + // determine if the change stream is keeping up with the pruner. If the + // watermark is outside of the window, we should log a warning message. + windows map[string]window } // New creates a new Pruner. @@ -79,7 +83,8 @@ func newWorker(cfg WorkerConfig) (*Pruner, error) { } pruner := &Pruner{ - cfg: cfg, + cfg: cfg, + windows: make(map[string]window), } pruner.tomb.Go(pruner.loop) @@ -129,16 +134,6 @@ func (w *Pruner) loop() error { } } -type window struct { - start, end time.Time -} - -// contains returns true if the window contains the given time. -// Note: This assumes there isn't any clock drift! -func (w *window) contains(t time.Time) bool { - return t.Compare(w.start) >= 0 && t.Compare(w.end) <= 0 -} - func (w *Pruner) prune() (map[string]int64, error) { traceEnabled := w.cfg.Logger.IsTraceEnabled() if traceEnabled { @@ -176,9 +171,15 @@ func (w *Pruner) prune() (map[string]int64, error) { {Namespace: coredatabase.ControllerNS}, }, modelNamespaces...) - // Prune each and every model found in the model list. This + // Prune each and every model found in the model list. pruned := make(map[string]int64) + modelNames := make(map[string]struct{}) for _, mn := range modelNamespaces { + // Store the model name in a map. We can't use pruned map for the + // tracking of namespaces, because if there is an error we might + // accidentally remove a window for a model that hasn't been deleted. + modelNames[mn.Namespace] = struct{}{} + p, err := w.pruneModel(ctx, mn.Namespace) if err != nil { // If there is an error, continue on to the next model, as we don't @@ -194,6 +195,15 @@ func (w *Pruner) prune() (map[string]int64, error) { pruned[mn.Namespace] = p } + // Ensure we clean up the windows for any models that have been deleted. + // The absence of a model in the modelNames list indicates that the model + // has been deleted and we should remove the window. + for namespace := range w.windows { + if _, ok := modelNames[namespace]; !ok { + delete(w.windows, namespace) + } + } + if traceEnabled { w.cfg.Logger.Tracef("Finished pruning change log") } @@ -223,19 +233,7 @@ func (w *Pruner) pruneModel(ctx context.Context, namespace string) (int64, error return pruned, errors.Trace(err) } -// ChangeLog represents a row from the change_log table. -type ChangeLog struct { - ID int `db:"id"` -} - -var ( - selectWitnessQuery = sqlair.MustPrepare(`SELECT (controller_id, lower_bound, updated_at) AS (&Watermark.*) FROM change_log_witness;`, Watermark{}) - - // TODO (stickupkid): This needs to be swapped out for the following query - // once we have a way to use functions in columns. - // SELECT COUNT(*) AS &Result.count FROM change_log WHERE created_at > $M.created_at LIMIT $M.limit; - selectChangeLogQuery = sqlair.MustPrepare(`SELECT id AS &ChangeLog.id FROM change_log WHERE created_at > $M.created_at LIMIT $M.limit;`, ChangeLog{}, sqlair.M{}) -) +var selectWitnessQuery = sqlair.MustPrepare(`SELECT (controller_id, lower_bound, updated_at) AS (&Watermark.*) FROM change_log_witness;`, Watermark{}) func (w *Pruner) locateLowestWatermark(ctx context.Context, tx *sqlair.TX, namespace string) (Watermark, error) { // Gather all the valid watermarks, post row pruning. These include @@ -254,28 +252,32 @@ func (w *Pruner) locateLowestWatermark(ctx context.Context, tx *sqlair.TX, names // Gather all the watermarks that are within the windowed time period. // If there are no watermarks within the window, then we can assume // that the stream is keeping up and we don't need to prune anything. - lowest, ok := w.lowestWatermark(namespace, watermarks, w.cfg.Clock.Now()) - if !ok { - // Check to see if the latest change log has a valid log in the last - // window duration, if not, then we can assume that the stream is not - // keeping up and we should log a warning. - var changes []ChangeLog - if err := tx.Query(ctx, selectChangeLogQuery, sqlair.M{ - "created_at": w.cfg.Clock.Now().Add(-defaultWindowDuration), - "limit": changestream.DefaultNumTermWatermarks + 1, - }).GetAll(&changes); err != nil && !errors.Is(err, sqlair.ErrNoRows) { - return Watermark{}, errors.Trace(err) - } - // If there are less than the default number of term watermarks, then - // we can assume that the stream is keeping up and we don't need to - // prune anything. - if len(changes) < changestream.DefaultNumTermWatermarks { - return Watermark{}, nil - } - w.cfg.Logger.Warningf("No watermarks within window, check logs to see if the change stream is keeping up") - return Watermark{}, nil + sorted := sortWatermarks(namespace, watermarks) + + // Find the first and last watermark in the sorted list, this is our + // window. It should hold the start and the end of the window. + watermarkView := window{ + start: sorted[0].UpdatedAt, + end: sorted[len(sorted)-1].UpdatedAt, } - return lowest, nil + + // If the watermark is outside of the window, we should log a warning + // message to indicate that the change stream is not keeping up. Only if + // the watermark is different from the last window, as we don't want to + // spam the logs if there are no changes. + now := w.cfg.Clock.Now() + timeView := window{ + start: now.Add(-defaultWindowDuration), + end: now, + } + if !timeView.Contains(watermarkView) && !watermarkView.Equals(w.windows[namespace]) { + w.cfg.Logger.Warningf("namespace %s watermarks %q are outside of window, check logs to see if the change stream is keeping up", namespace, sorted[0].ControllerID) + } + + // Save the last window for the next iteration. + w.windows[namespace] = watermarkView + + return sorted[0], nil } var deleteQuery = sqlair.MustPrepare(`DELETE FROM change_log WHERE id <= $M.id;`, sqlair.M{}) @@ -290,50 +292,6 @@ func (w *Pruner) deleteChangeLog(ctx context.Context, tx *sqlair.TX, lowest Wate return pruned, errors.Trace(err) } -func (w *Pruner) lowestWatermark(namespace string, watermarks []Watermark, now time.Time) (Watermark, bool) { - // Select the lower bound of the watermark, only if the updated_at time - // is within a windowed time period. - var ( - view = window{ - start: now.Add(-defaultWindowDuration), - end: now, - } - lowest = Watermark{ - LowerBound: math.MaxInt64, - } - ) - for _, watermark := range watermarks { - // TODO (stickupkid): If any watermarks are outside of the window, then - // we should force the falling behind controller to bounce and to try - // again at keeping up. We don't have any mechanism to do this yet, - // adding a item to table might be a waste of time. If the controller - // isn't inserting into the change log, they're either bouncing all the - // time or are deadlocked. If they're the latter no amount of inserting - // into a table will solve the problem, as they're not reading the - // change log anyway. - // In addition to this, we have no theatre experience about what a - // good valid window time is. For now we'll just log a warning for - // visibility, before we solidify the approach. - if !view.contains(watermark.UpdatedAt) { - w.cfg.Logger.Warningf("namespace %s watermarks %q are outside of window, check logs to see if the change stream is keeping up", namespace, watermark.ControllerID) - } - - // Select the lower bound of the watermark. - if watermark.LowerBound < lowest.LowerBound { - lowest = watermark - } - } - - // Nothing was selected, this means that there are no watermarks within - // the windowed time period. It could also mean that potentially there are - // are now controllers not keeping up or recording their changes. - if lowest.LowerBound == math.MaxInt64 { - return Watermark{}, false - } - - return lowest, true -} - // scopedContext returns a context that is in the scope of the worker lifetime. // It returns a cancellable context that is cancelled when the action has // completed. @@ -342,6 +300,23 @@ func (w *Pruner) scopedContext() (context.Context, context.CancelFunc) { return w.tomb.Context(ctx), cancel } +func sortWatermarks(namespace string, watermarks []Watermark) []Watermark { + // If there is only one watermark, just use that one and return out early. + if len(watermarks) == 1 { + return watermarks + } + + // Sort the watermarks by the lower bound. + sort.Slice(watermarks, func(i, j int) bool { + if watermarks[i].LowerBound == watermarks[j].LowerBound { + return watermarks[i].UpdatedAt.Before(watermarks[j].UpdatedAt) + } + return watermarks[i].LowerBound < watermarks[j].LowerBound + }) + + return watermarks +} + // ModelNamespace represents a model and the associated DQlite namespace that it // uses. type ModelNamespace struct { @@ -355,3 +330,24 @@ type Watermark struct { LowerBound int64 `db:"lower_bound"` UpdatedAt time.Time `db:"updated_at"` } + +type window struct { + start, end time.Time +} + +// Contains returns true if the window contains the given time. +func (w window) Contains(o window) bool { + if w.Equals(o) { + return true + } + return w.start.Before(o.start) && w.end.After(o.end) +} + +// Equals returns true if the window is equal to the given window. +func (w window) Equals(o window) bool { + return w.start.Equal(o.start) && w.end.Equal(o.end) +} + +func (w window) String() string { + return w.start.Format(time.RFC3339) + " -> " + w.end.Format(time.RFC3339) +} diff --git a/internal/worker/changestreampruner/worker_test.go b/internal/worker/changestreampruner/worker_test.go index b4f832a0802..d7c702cd325 100644 --- a/internal/worker/changestreampruner/worker_test.go +++ b/internal/worker/changestreampruner/worker_test.go @@ -81,7 +81,7 @@ func (s *workerSuite) TestPruneControllerNS(c *gc.C) { s.insertControllerNodes(c, 1) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 1002, UpdatedAt: now.Add(-time.Minute)}) s.truncateChangeLog(c, s.TxnRunner()) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) result, err := pruner.prune() c.Check(err, jc.ErrorIsNil) @@ -113,7 +113,7 @@ func (s *workerSuite) TestPruneModelList(c *gc.C) { s.expectDBGet(modelUUID.String(), txnRunner) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 1002, UpdatedAt: now.Add(-time.Minute)}) s.truncateChangeLog(c, s.TxnRunner()) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) result, err := pruner.prune() c.Check(err, jc.ErrorIsNil) @@ -146,11 +146,11 @@ func (s *workerSuite) TestPruneModelListWithChangeLogItems(c *gc.C) { s.expectDBGet(modelUUID.String(), txnRunner) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 1002, UpdatedAt: now.Add(-time.Minute)}) s.truncateChangeLog(c, s.TxnRunner()) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) s.insertChangeLogWitness(c, txnRunner, Watermark{ControllerID: "0", LowerBound: 1003, UpdatedAt: now.Add(-time.Second)}) s.truncateChangeLog(c, txnRunner) - s.insertChangeLogItems(c, txnRunner, 6, now) + s.insertChangeLogItems(c, txnRunner, 0, 6, now) result, err := pruner.prune() c.Check(err, jc.ErrorIsNil) @@ -214,7 +214,9 @@ func (s *workerSuite) TestPruneModelChangeLogWitness(c *gc.C) { func (s *workerSuite) TestPruneModelLogsWarning(c *gc.C) { defer s.setupMocks(c).Finish() - s.expectDBGet("foo", s.TxnRunner()) + // We request the db + + s.expectDBGetTimes("foo", s.TxnRunner(), 3) s.expectClock() s.logger.EXPECT().Warningf("namespace %s watermarks %q are outside of window, check logs to see if the change stream is keeping up", gomock.Any(), gomock.Any()).Do(c.Logf).Times(2) @@ -227,11 +229,31 @@ func (s *workerSuite) TestPruneModelLogsWarning(c *gc.C) { s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "3", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration + time.Minute))}) - s.insertChangeLogItems(c, s.TxnRunner(), 1, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 1, now) result, err := pruner.pruneModel(context.Background(), "foo") c.Check(err, jc.ErrorIsNil) c.Check(result, gc.Equals, int64(1)) + + // Should not prune anything as there are no new changes. Notice that the + // warning is not logged. + + result, err = pruner.pruneModel(context.Background(), "foo") + c.Check(err, jc.ErrorIsNil) + c.Check(result, gc.Equals, int64(0)) + + // Add some new changes and it should log the warning. + + now = time.Now() + + s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 2, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}) + s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "3", LowerBound: 2, UpdatedAt: now.Add(-(defaultWindowDuration + time.Minute))}) + + s.insertChangeLogItems(c, s.TxnRunner(), 1, 1, now) + + result, err = pruner.pruneModel(context.Background(), "foo") + c.Check(err, jc.ErrorIsNil) + c.Check(result, gc.Equals, int64(1)) } func (s *workerSuite) TestPruneModelRemovesChangeLogItems(c *gc.C) { @@ -250,7 +272,7 @@ func (s *workerSuite) TestPruneModelRemovesChangeLogItems(c *gc.C) { s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 1002, UpdatedAt: now.Add(-time.Minute)}) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "3", LowerBound: 1003, UpdatedAt: now.Add(-time.Second)}) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) result, err := pruner.pruneModel(context.Background(), "foo") c.Check(err, jc.ErrorIsNil) @@ -275,7 +297,7 @@ func (s *workerSuite) TestPruneModelRemovesChangeLogItemsWithMultipleWatermarks( s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "0", LowerBound: 1005, UpdatedAt: now.Add(-time.Minute)}) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "1", LowerBound: 1002, UpdatedAt: now.Add(-time.Second)}) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) result, err := pruner.pruneModel(context.Background(), "foo") c.Check(err, jc.ErrorIsNil) @@ -301,7 +323,7 @@ func (s *workerSuite) TestPruneModelRemovesChangeLogItemsWithMultipleWatermarksW s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "1", LowerBound: 1002, UpdatedAt: now.Add(-time.Second)}) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "2", LowerBound: 1001, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) result, err := pruner.pruneModel(context.Background(), "foo") c.Check(err, jc.ErrorIsNil) @@ -327,7 +349,7 @@ func (s *workerSuite) TestPruneModelRemovesChangeLogItemsWithMultipleWatermarksM s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "1", LowerBound: 1002, UpdatedAt: now.Add(-time.Second)}) s.insertChangeLogWitness(c, s.TxnRunner(), Watermark{ControllerID: "2", LowerBound: 1001, UpdatedAt: now.Add(-time.Second)}) - s.insertChangeLogItems(c, s.TxnRunner(), 10, now) + s.insertChangeLogItems(c, s.TxnRunner(), 0, 10, now) result, err := pruner.pruneModel(context.Background(), "foo") c.Check(err, jc.ErrorIsNil) @@ -340,29 +362,72 @@ func (s *workerSuite) TestWindowContains(c *gc.C) { now := time.Now() testCases := []struct { window window - now time.Time + other window expected bool }{{ - window: window{start: now.Add(-time.Minute), end: now}, - now: now, + window: window{start: now, end: now}, + other: window{start: now, end: now}, expected: true, }, { - window: window{start: now.Add(-time.Minute), end: now}, - now: now.Add(-time.Minute), + window: window{start: now.Add(-time.Minute), end: now.Add(time.Minute)}, + other: window{start: now, end: now}, expected: true, }, { - window: window{start: now.Add(-time.Minute), end: now}, - now: now.Add(-(time.Minute * 2)), + window: window{start: now.Add(time.Minute), end: now.Add(-time.Minute)}, + other: window{start: now, end: now}, + expected: false, + }, { + window: window{start: now.Add(time.Minute), end: now.Add(time.Minute)}, + other: window{start: now, end: now}, expected: false, }, { - window: window{start: now.Add(-time.Minute), end: now}, - now: now.Add(time.Minute * 2), + window: window{start: now.Add(-time.Minute), end: now.Add(-time.Minute)}, + other: window{start: now, end: now}, + expected: false, + }, { + window: window{start: now, end: now.Add(time.Minute * 2)}, + other: window{start: now.Add(time.Minute), end: now.Add(time.Minute + time.Second)}, + expected: true, + }, { + window: window{start: now, end: now.Add(time.Minute * 2)}, + other: window{start: now.Add(time.Nanosecond), end: now.Add((time.Minute * 2) - time.Nanosecond)}, + expected: true, + }, { + window: window{start: now, end: now.Add(time.Minute * 2)}, + other: window{start: now, end: now.Add((time.Minute * 2) - time.Nanosecond)}, + expected: false, + }, { + window: window{start: now, end: now.Add(time.Minute * 2)}, + other: window{start: now.Add(time.Nanosecond), end: now.Add(time.Minute * 2)}, + expected: false, + }} + for i, test := range testCases { + c.Logf("test %d", i) + + got := test.window.Contains(test.other) + c.Check(got, gc.Equals, test.expected) + } +} + +func (s *workerSuite) TestWindowEquals(c *gc.C) { + now := time.Now() + testCases := []struct { + window window + other window + expected bool + }{{ + window: window{start: now, end: now}, + other: window{start: now, end: now}, + expected: true, + }, { + window: window{start: now.Add(-time.Minute), end: now.Add(time.Minute)}, + other: window{start: now, end: now}, expected: false, }} for i, test := range testCases { c.Logf("test %d", i) - got := test.window.contains(test.now) + got := test.window.Equals(test.other) c.Check(got, gc.Equals, test.expected) } } @@ -375,59 +440,69 @@ func (s *workerSuite) TestLowestWatermark(c *gc.C) { now := time.Now() testCases := []struct { watermarks []Watermark - now time.Time - expected Watermark - expectedOK bool + expected []Watermark }{{ watermarks: []Watermark{ {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, }, - now: now, - expected: Watermark{ControllerID: "0", LowerBound: 1, UpdatedAt: now}, - expectedOK: true, + expected: []Watermark{ + {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, + }, }, { watermarks: []Watermark{ {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, {ControllerID: "1", LowerBound: 1, UpdatedAt: now}, }, - now: now, - expected: Watermark{ControllerID: "0", LowerBound: 1, UpdatedAt: now}, - expectedOK: true, + expected: []Watermark{ + {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, + {ControllerID: "1", LowerBound: 1, UpdatedAt: now}, + }, }, { watermarks: []Watermark{ {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, {ControllerID: "1", LowerBound: 10, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, }, - now: now, - expected: Watermark{ControllerID: "0", LowerBound: 1, UpdatedAt: now}, - expectedOK: true, + expected: []Watermark{ + {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, + {ControllerID: "1", LowerBound: 10, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, + }, }, { watermarks: []Watermark{ {ControllerID: "0", LowerBound: 2, UpdatedAt: now}, {ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration - time.Second))}, }, - now: now, - expected: Watermark{ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration - time.Second))}, - expectedOK: true, + expected: []Watermark{ + {ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration - time.Second))}, + {ControllerID: "0", LowerBound: 2, UpdatedAt: now}, + }, + }, { + watermarks: []Watermark{ + {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, + {ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration - time.Second))}, + }, + expected: []Watermark{ + {ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration - time.Second))}, + {ControllerID: "0", LowerBound: 1, UpdatedAt: now}, + }, }, { watermarks: []Watermark{ {ControllerID: "0", LowerBound: 2, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, {ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, }, - now: now, // TODO (stickupkid): This should be false, but we need a strategy for // removing nodes that are not keeping up. We're logging a warning // instead. - expected: Watermark{ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, - expectedOK: true, + expected: []Watermark{ + {ControllerID: "1", LowerBound: 1, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, + {ControllerID: "0", LowerBound: 2, UpdatedAt: now.Add(-(defaultWindowDuration + time.Second))}, + }, }} for i, test := range testCases { c.Logf("test %d", i) - got, ok := s.newPruner(c).lowestWatermark("foo", test.watermarks, test.now) + got := sortWatermarks("foo", test.watermarks) c.Check(got, jc.DeepEquals, test.expected) - c.Check(ok, gc.Equals, test.expectedOK) } } @@ -438,6 +513,7 @@ func (s *workerSuite) newPruner(c *gc.C) *Pruner { Clock: s.clock, Logger: s.logger, }, + windows: make(map[string]window), } } @@ -466,6 +542,7 @@ func (s *workerSuite) insertChangeLogWitness(c *gc.C, runner coredatabase.TxnRun query, err := sqlair.Prepare(` INSERT INTO change_log_witness (controller_id, lower_bound, updated_at) VALUES ($M.ctrl_id, $M.lower_bound, $M.updated_at) +ON CONFLICT (controller_id) DO UPDATE SET lower_bound = $M.lower_bound, updated_at = $M.updated_at; `, sqlair.M{}) c.Assert(err, jc.ErrorIsNil) @@ -483,7 +560,7 @@ VALUES ($M.ctrl_id, $M.lower_bound, $M.updated_at) c.Assert(err, jc.ErrorIsNil) } -func (s *workerSuite) insertChangeLogItems(c *gc.C, runner coredatabase.TxnRunner, amount int, now time.Time) { +func (s *workerSuite) insertChangeLogItems(c *gc.C, runner coredatabase.TxnRunner, start, amount int, now time.Time) { query, err := sqlair.Prepare(` INSERT INTO change_log (id, edit_type_id, namespace_id, changed, created_at) VALUES ($M.id, 4, 2, 0, $M.created_at); @@ -491,7 +568,7 @@ VALUES ($M.id, 4, 2, 0, $M.created_at); c.Assert(err, jc.ErrorIsNil) err = runner.Txn(context.Background(), func(ctx context.Context, tx *sqlair.TX) error { - for i := 0; i < amount; i++ { + for i := start; i < amount; i++ { err := tx.Query(ctx, query, sqlair.M{ "id": i + 1000, "created_at": now,