From c3db6ffc5b8c7cf19c52407bb075b9679dde1fff Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Fri, 29 Mar 2024 14:06:49 -0700 Subject: [PATCH] [dynamodb] Add max attempts to `processShard` --- lib/kafkalib/writer.go | 2 +- sources/dynamodb/shard.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index e7bbca64..6e20bb4d 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -96,7 +96,7 @@ func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e var kafkaErr error chunk := iter.Next() - for attempts := 0; attempts < 10; attempts++ { + for attempts := range 10 { if attempts > 0 { sleepDuration := jitter.Jitter(baseJitterMs, maxJitterMs, attempts-1) slog.Info("Failed to publish to kafka", diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index 2d421c55..0a1a4e44 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -2,6 +2,7 @@ package dynamodb import ( "context" + "fmt" "log/slog" "time" @@ -15,6 +16,8 @@ import ( "github.com/artie-labs/reader/lib/writer" ) +const maxAttempts = 50 + func (s *StreamStore) ListenToChannel(ctx context.Context, _writer writer.Writer) { for shard := range s.shardChan { go s.processShard(ctx, shard, _writer) @@ -110,6 +113,10 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S attempts += 1 } + if attempts >= maxAttempts { + logger.Panic(fmt.Sprintf("Failed to publish messages after %d attempts, exiting...", attempts)) + } + time.Sleep(jitter.Jitter(jitterSleepBaseMs, jitter.DefaultMaxMs, attempts)) shardIterator = getRecordsOutput.NextShardIterator