From a74d8bae28a41b88e7bc560196f0d51bf0c11d20 Mon Sep 17 00:00:00 2001 From: Elijah Parker <114941210+eparker-tulip@users.noreply.github.com> Date: Mon, 16 Dec 2024 08:47:14 -0600 Subject: [PATCH] Add metric for catchup failure and increase catchup time to 2 minutes (#91) Previously the max catchup time default was 1 minute, which wasn't always enough to recover from a pod restart. This 1. doubles the time to 2 minutes 2. and also increases the dedup key TTL from 2m to 2.5m to allow for covering the catchup time without greatly increasing the number of keys (25% increase in total dedup keys, same create/expire rate). 3. Also added is `resume_failed` metric that increments each time it cannot catchup from where it left off. 4. Logging is also improved with the addition of last processed age in seconds. --- default.nix | 2 +- lib/config/main.go | 4 ++-- lib/config/main_test.go | 20 ++++++++++---------- lib/oplog/tail.go | 27 ++++++++++++++++++++++----- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/default.nix b/default.nix index 3e335bd..ca859b4 100644 --- a/default.nix +++ b/default.nix @@ -2,7 +2,7 @@ buildGoModule { pname = "oplogtoredis"; - version = "3.8.0"; + version = "3.8.1"; src = builtins.path { path = ./.; }; postInstall = '' diff --git a/lib/config/main.go b/lib/config/main.go index d857cb5..df3412f 100644 --- a/lib/config/main.go +++ b/lib/config/main.go @@ -16,8 +16,8 @@ type oplogtoredisConfiguration struct { HTTPServerAddr string `default:"0.0.0.0:9000" envconfig:"HTTP_SERVER_ADDR"` BufferSize int `default:"10000" split_words:"true"` TimestampFlushInterval time.Duration `default:"1s" split_words:"true"` - MaxCatchUp time.Duration `default:"60s" split_words:"true"` - RedisDedupeExpiration time.Duration `default:"120s" split_words:"true"` + MaxCatchUp time.Duration `default:"120s" split_words:"true"` + RedisDedupeExpiration time.Duration `default:"150s" split_words:"true"` RedisMetadataPrefix string `default:"oplogtoredis::" split_words:"true"` MongoConnectTimeout time.Duration `default:"10s" split_words:"true"` MongoQueryTimeout time.Duration `default:"5s" split_words:"true"` diff --git a/lib/config/main_test.go b/lib/config/main_test.go index 2957f12..0fd2471 100644 --- a/lib/config/main_test.go +++ b/lib/config/main_test.go @@ -45,8 +45,8 @@ var envTests = map[string]struct { HTTPServerAddr: "0.0.0.0:9000", BufferSize: 10000, TimestampFlushInterval: time.Second, - MaxCatchUp: time.Minute, - RedisDedupeExpiration: 2 * time.Minute, + MaxCatchUp: 2 * time.Minute, + RedisDedupeExpiration: 2 * time.Minute + 30 * time.Second, RedisMetadataPrefix: "oplogtoredis::", }, }, @@ -109,42 +109,42 @@ func TestParseEnv(t *testing.T) { func checkConfigExpectation(t *testing.T, expectedConfig *oplogtoredisConfiguration) { if expectedConfig.MongoURL != MongoURL() { - t.Errorf("Incorrect Mongo URL. Got \"%s\", Expected \"%s\"", + t.Errorf("Incorrect Mongo URL. Expected \"%s\", Got \"%s\"", expectedConfig.MongoURL, MongoURL()) } if expectedConfig.RedisURL != strings.Join(RedisURL()[:], "") { - t.Errorf("Incorrect Redis URL. Got \"%s\", Expected \"%s\"", + t.Errorf("Incorrect Redis URL. Expected \"%s\", Got \"%s\"", expectedConfig.RedisURL, RedisURL()) } if expectedConfig.HTTPServerAddr != HTTPServerAddr() { - t.Errorf("Incorrect HTTPServerAddr. Got \"%s\", Expected \"%s\"", + t.Errorf("Incorrect HTTPServerAddr. Expected \"%s\", Got \"%s\"", expectedConfig.HTTPServerAddr, HTTPServerAddr()) } if expectedConfig.BufferSize != BufferSize() { - t.Errorf("Incorrect BufferSize. Got %d, Expected %d", + t.Errorf("Incorrect BufferSize. Expected %d, Got %d", expectedConfig.BufferSize, BufferSize()) } if expectedConfig.TimestampFlushInterval != TimestampFlushInterval() { - t.Errorf("Incorrect TimestampFlushInterval. Got %d, Expected %d", + t.Errorf("Incorrect TimestampFlushInterval. Expected %d, Got %d", expectedConfig.TimestampFlushInterval, TimestampFlushInterval()) } if expectedConfig.MaxCatchUp != MaxCatchUp() { - t.Errorf("Incorrect MaxCatchUp. Got %d, Expected %d", + t.Errorf("Incorrect MaxCatchUp. Expected %d, Got %d", expectedConfig.MaxCatchUp, MaxCatchUp()) } if expectedConfig.RedisDedupeExpiration != RedisDedupeExpiration() { - t.Errorf("Incorrect RedisDedupeExpiration. Got %d, Expected %d", + t.Errorf("Incorrect RedisDedupeExpiration. Expected %d, Got %d", expectedConfig.RedisDedupeExpiration, RedisDedupeExpiration()) } if expectedConfig.RedisMetadataPrefix != RedisMetadataPrefix() { - t.Errorf("Incorrect RedisMetadataPrefix. Got \"%s\", Expected \"%s\"", + t.Errorf("Incorrect RedisMetadataPrefix. Expected \"%s\", Got \"%s\"", expectedConfig.RedisMetadataPrefix, RedisMetadataPrefix()) } } diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 7ca30e9..749164f 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -103,6 +103,14 @@ var ( Name: "last_received_staleness", Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.", }, []string{"ordinal"}) + + metricOplogResumeGap = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "otr", + Subsystem: "oplog", + Name: "resume_gap_seconds", + Help: "Histogram recording the gap in time that a tailing resume had to catchup and whether it was successful or not.", + Buckets: []float64{1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000}, + }, []string{"status"}) ) func init() { @@ -440,25 +448,34 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry // Get the earliest "last processed time" for each shard. This assumes that the number of shards is constant. ts, tsTime, redisErr := redispub.FirstLastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, maxOrdinal) + gapSeconds := time.Since(tsTime) / time.Second + if redisErr == nil { - // we have a last write time, check that it's not too far in the - // past + // we have a last write time, check that it's not too far in the past if tsTime.After(time.Now().Add(-1 * tailer.MaxCatchUp)) { - log.Log.Infof("Found last processed timestamp, resuming oplog tailing from %d", tsTime.Unix()) + log.Log.Infof("Found last processed timestamp, resuming oplog tailing", + "timestamp", tsTime.Unix(), + "age_seconds", gapSeconds) + metricOplogResumeGap.WithLabelValues("success").Observe(float64(gapSeconds)) return ts } - log.Log.Warnf("Found last processed timestamp, but it was too far in the past (%d). Will start from end of oplog", tsTime.Unix()) + log.Log.Warnw("Found last processed timestamp, but it was too far in the past. Will start from end of oplog", + "timestamp", tsTime.Unix(), + "age_seconds", gapSeconds) } if (redisErr != nil) && (redisErr != redis.Nil) { log.Log.Errorw("Error querying Redis for last processed timestamp. Will start from end of oplog.", "error", redisErr) } + + metricOplogResumeGap.WithLabelValues("failed").Observe(float64(gapSeconds)) mongoOplogEndTimestamp, mongoErr := getTimestampOfLastOplogEntry() if mongoErr == nil { - log.Log.Infof("Starting tailing from end of oplog (timestamp %d)", mongoOplogEndTimestamp.T) + log.Log.Infow("Starting tailing from end of oplog", + "timestamp", mongoOplogEndTimestamp.T) return *mongoOplogEndTimestamp }