From c10a3dee35e65295aeace73f26a66ab13c7c7314 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 2 May 2023 17:33:20 -0700 Subject: [PATCH] [Enhancement] Adding jitter sleep after signaling flush (#86) --- go.mod | 1 + lib/db/mock/db.go | 1 - lib/jitter/sleep.go | 14 ++++++++++++++ processes/consumer/kafka.go | 6 ++++++ processes/consumer/pubsub.go | 6 ++++++ 5 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 lib/jitter/sleep.go diff --git a/go.mod b/go.mod index 4ec8fd1b6..e0c10bdad 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/viant/bigquery v0.2.1-0.20230129024722-24ed6fd5555f go.mongodb.org/mongo-driver v1.11.0 + golang.org/x/exp v0.0.0-20191002040644-a1355ae1e2c3 google.golang.org/api v0.103.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/lib/db/mock/db.go b/lib/db/mock/db.go index ece7c388f..db085d0f4 100644 --- a/lib/db/mock/db.go +++ b/lib/db/mock/db.go @@ -3,7 +3,6 @@ package mock import ( "database/sql" "fmt" - "github.com/artie-labs/transfer/lib/mocks" ) diff --git a/lib/jitter/sleep.go b/lib/jitter/sleep.go new file mode 100644 index 000000000..89ea798d8 --- /dev/null +++ b/lib/jitter/sleep.go @@ -0,0 +1,14 @@ +package jitter + +import ( + "golang.org/x/exp/rand" + "math" +) + +const maxMilliSeconds = 3500 + +func JitterMs(baseMilliSeconds, attempts int) int { + // https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + // sleep = random_between(0, min(cap, base * 2 ** attempt)) + return rand.Intn(int(math.Min(maxMilliSeconds, float64(baseMilliSeconds)*math.Pow(2, float64(attempts))))) +} diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index d943a7946..2c4cf5b49 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "github.com/artie-labs/transfer/lib/artie" + "github.com/artie-labs/transfer/lib/jitter" awsCfg "github.com/aws/aws-sdk-go-v2/config" "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" "github.com/segmentio/kafka-go/sasl/plain" @@ -104,6 +105,11 @@ func StartConsumer(ctx context.Context, flushChan chan bool) { if shouldFlush { flushChan <- true + // Jitter-sleep is necessary to allow the flush process to acquire the table lock + // If it doesn't then the flush process may be over-exhausted since the lock got acquired by `processMessage(...)`. + // This then leads us to make unnecessary flushes. + jitterDuration := jitter.JitterMs(500, 1) + time.Sleep(time.Duration(jitterDuration) * time.Millisecond) } } }(topic) diff --git a/processes/consumer/pubsub.go b/processes/consumer/pubsub.go index c2b01bffc..d18eca83c 100644 --- a/processes/consumer/pubsub.go +++ b/processes/consumer/pubsub.go @@ -7,6 +7,7 @@ import ( "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/cdc/format" "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/jitter" "github.com/artie-labs/transfer/lib/logger" "google.golang.org/api/option" "sync" @@ -94,6 +95,11 @@ func StartSubscriber(ctx context.Context, flushChan chan bool) { if shouldFlush { flushChan <- true + // Jitter-sleep is necessary to allow the flush process to acquire the table lock + // If it doesn't then the flush process may be over-exhausted since the lock got acquired by `processMessage(...)`. + // This then leads us to make unnecessary flushes. + jitterDuration := jitter.JitterMs(500, 1) + time.Sleep(time.Duration(jitterDuration) * time.Millisecond) } })