From 222708d431b1bf4aa51c4b82cf0fc9d432293a66 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Tue, 14 Nov 2023 08:32:11 +0200 Subject: [PATCH] Allow setting replica interval offsets If using S3 replicas the extra LIST call on startup may be expensive in scale if a lot of Litestreams are started in quick succession. Allowing configuring the snapshot offset externally allows the caller to spread around the snapshots without relying on access to the remote replica making restarts no-op. Same thing applies to retention checks that they can easily stack too close. --- replica.go | 76 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 17 deletions(-) diff --git a/replica.go b/replica.go index f97d34f2..1e51fe86 100644 --- a/replica.go +++ b/replica.go @@ -54,6 +54,9 @@ type Replica struct { // Frequency to create new snapshots. SnapshotInterval time.Duration + // Time waited before starting the snapshot interval. Skips checking replica to calculate offset. + SnapshotOffset time.Duration + // Time to keep snapshots and related WAL files. // Database is snapshotted after interval, if needed, and older WAL files are discarded. Retention time.Duration @@ -61,9 +64,15 @@ type Replica struct { // Time between checks for retention. RetentionCheckInterval time.Duration + // Time waited before starting the check interval. + RetentionCheckOffset time.Duration + // Time between validation checks. ValidationInterval time.Duration + // Time waited before starting the validation checks. + ValidationOffset time.Duration + // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool @@ -720,6 +729,20 @@ func (r *Replica) retainer(ctx context.Context) { checkInterval = r.Retention } + // Offset retention checks if configured. + if r.RetentionCheckOffset > 0 { + r.Logger().Info("retention check interval adjusted", "next", time.Now().Add(r.RetentionCheckOffset).Format(time.RFC3339)) + + select { + case <-ctx.Done(): + return + case <-time.After(r.RetentionCheckOffset): + if err := r.EnforceRetention(ctx); err != nil { + r.Logger().Error("retainer error", "error", err) + } + } + } + ticker := time.NewTicker(checkInterval) defer ticker.Stop() @@ -743,26 +766,31 @@ func (r *Replica) snapshotter(ctx context.Context) { } logger := r.Logger() - if pos, err := r.db.Pos(); err != nil { - logger.Error("snapshotter cannot determine generation", "error", err) - } else if !pos.IsZero() { - if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil { - logger.Error("snapshotter cannot determine latest snapshot", "error", err) - } else if snapshot != nil { - nextSnapshot := r.SnapshotInterval - time.Since(snapshot.CreatedAt) - if nextSnapshot < 0 { - nextSnapshot = 0 + if r.SnapshotOffset == 0 { + if pos, err := r.db.Pos(); err != nil { + logger.Error("snapshotter cannot determine generation", "error", err) + } else if !pos.IsZero() { + if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil { + logger.Error("snapshotter cannot determine latest snapshot", "error", err) + } else if snapshot != nil { + r.SnapshotOffset = r.SnapshotInterval - time.Since(snapshot.CreatedAt) + // ensure we will snapshot immediately if zero than less + if r.SnapshotOffset < 1 { + r.SnapshotOffset = 1 + } } + } + } - logger.Info("snapshot interval adjusted", "previous", snapshot.CreatedAt.Format(time.RFC3339), "next", nextSnapshot.String()) + if r.SnapshotOffset > 0 { + logger.Info("snapshot interval adjusted", "next", time.Now().Add(r.SnapshotOffset).Format(time.RFC3339)) - select { - case <-ctx.Done(): - return - case <-time.After(nextSnapshot): - if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { - logger.Error("snapshotter error", "error", err) - } + select { + case <-ctx.Done(): + return + case <-time.After(r.SnapshotOffset): + if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { + logger.Error("snapshotter error", "error", err) } } } @@ -795,6 +823,20 @@ func (r *Replica) validator(ctx context.Context) { return } + // Offset validations if configured. + if r.ValidationOffset > 0 { + r.Logger().Info("validation interval adjusted", "next", time.Now().Add(r.ValidationInterval).Format(time.RFC3339)) + + select { + case <-ctx.Done(): + return + case <-time.After(r.ValidationOffset): + if err := r.Validate(ctx); err != nil { + r.Logger().Error("validation error", "error", err) + } + } + } + ticker := time.NewTicker(r.ValidationInterval) defer ticker.Stop()