Skip to content

Commit

Permalink
[dynamodb] Add max attempts to processShard
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 29, 2024
1 parent 646a5e2 commit c3db6ff
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e

var kafkaErr error
chunk := iter.Next()
for attempts := 0; attempts < 10; attempts++ {
for attempts := range 10 {
if attempts > 0 {
sleepDuration := jitter.Jitter(baseJitterMs, maxJitterMs, attempts-1)
slog.Info("Failed to publish to kafka",
Expand Down
7 changes: 7 additions & 0 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamodb

import (
"context"
"fmt"
"log/slog"
"time"

Expand All @@ -15,6 +16,8 @@ import (
"github.com/artie-labs/reader/lib/writer"
)

const maxAttempts = 50

func (s *StreamStore) ListenToChannel(ctx context.Context, _writer writer.Writer) {
for shard := range s.shardChan {
go s.processShard(ctx, shard, _writer)
Expand Down Expand Up @@ -110,6 +113,10 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S
attempts += 1
}

if attempts >= maxAttempts {
logger.Panic(fmt.Sprintf("Failed to publish messages after %d attempts, exiting...", attempts))
}

time.Sleep(jitter.Jitter(jitterSleepBaseMs, jitter.DefaultMaxMs, attempts))

shardIterator = getRecordsOutput.NextShardIterator
Expand Down

0 comments on commit c3db6ff

Please sign in to comment.