diff --git a/lib/ttlmap/ttlmap.go b/lib/ttlmap/ttlmap.go index 6439c218..2bdb3169 100644 --- a/lib/ttlmap/ttlmap.go +++ b/lib/ttlmap/ttlmap.go @@ -59,6 +59,14 @@ type SetArgs struct { DoNotFlushToDisk bool } +func (t *TTLMap) Remove(key string) { + t.mu.Lock() + defer t.mu.Unlock() + + delete(t.data, key) + t.shouldSave = true +} + func (t *TTLMap) Set(setArgs SetArgs, ttl time.Duration) { t.mu.Lock() defer t.mu.Unlock() diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index c26ef617..7b3c10bd 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -17,8 +17,7 @@ import ( "github.com/artie-labs/reader/sources/dynamodb/offsets" ) -// jitterSleepBaseMs - sleep for 50 ms as the base. -const jitterSleepBaseMs = 50 +const jitterSleepBaseMs = 100 const shardScannerInterval = 5 * time.Minute func Load(cfg config.DynamoDB) (sources.Source, bool, error) { diff --git a/sources/dynamodb/offsets/offsets.go b/sources/dynamodb/offsets/offsets.go index 9b8a9e53..721a38f9 100644 --- a/sources/dynamodb/offsets/offsets.go +++ b/sources/dynamodb/offsets/offsets.go @@ -67,6 +67,10 @@ func (o *OffsetStorage) SetShardProcessing(shardID string) { }, ShardExpirationAndBuffer) } +func (o *OffsetStorage) UnsetShardProcessing(shardID string) { + o.ttlMap.Remove(shardProcessingKey(shardID)) +} + func (o *OffsetStorage) GetShardProcessing(shardID string) bool { _, isOk := o.ttlMap.Get(shardProcessingKey(shardID)) return isOk diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index 18062d7d..92542424 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -2,6 +2,7 @@ package dynamodb import ( "context" + "fmt" "log/slog" "time" @@ -16,13 +17,32 @@ import ( "github.com/artie-labs/reader/writers" ) +const maxNumErrs = 25 + func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer) { for shard := range s.shardChan { - go s.processShard(ctx, shard, writer) + go s.processShard(ctx, shard, writer, 0) } } -func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer) { +func (s *StreamStore) reprocessShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer, numErrs int, err error) { + if numErrs > maxNumErrs { + logger.Panic(fmt.Sprintf("Failed to process shard: %s and the max number of attempts have been reached", *shard.ShardId), err) + } + + slog.Warn("Failed to process shard, going to try again...", + slog.Any("err", err), + slog.String("streamArn", s.streamArn), + slog.String("shardId", *shard.ShardId), + slog.Int("numErrs", numErrs), + ) + + // Unset it so we can process it again + s.storage.UnsetShardProcessing(*shard.ShardId) + s.processShard(ctx, shard, writer, numErrs+1) +} + +func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer, numErrs int) { // Is there another go-routine processing this shard? if s.storage.GetShardProcessing(*shard.ShardId) { return @@ -30,11 +50,11 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S if parentID := shard.ParentShardId; parentID != nil { // Have we seen the parent? If so, let's wait for processing to finish - // If we haven't seen the parent, then we can assume this is the parent and we don't need to wait. + // If we haven't seen the parent, then we can assume this is the parent, and we don't need to wait. if s.storage.GetShardSeen(*parentID) && !s.storage.GetShardProcessed(*parentID) { slog.Info("Parent shard is being processed, let's sleep 3s and retry", slog.String("shardId", *shard.ShardId), slog.String("parentShardId", *parentID)) time.Sleep(3 * time.Second) - s.processShard(ctx, shard, writer) + s.processShard(ctx, shard, writer, numErrs) return } } @@ -67,11 +87,7 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S iteratorOutput, err := s.streams.GetShardIterator(iteratorInput) if err != nil { - slog.Warn("Failed to get shard iterator...", - slog.Any("err", err), - slog.String("streamArn", s.streamArn), - slog.String("shardId", *shard.ShardId), - ) + s.reprocessShard(ctx, shard, writer, numErrs, fmt.Errorf("failed to get shard iterator: %w", err)) return } @@ -85,12 +101,8 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S getRecordsOutput, err := s.streams.GetRecords(getRecordsInput) if err != nil { - slog.Warn("Failed to get records from shard iterator...", - slog.Any("err", err), - slog.String("streamArn", s.streamArn), - slog.String("shardId", *shard.ShardId), - ) - break + s.reprocessShard(ctx, shard, writer, numErrs, fmt.Errorf("failed to get records: %w", err)) + return } var messages []lib.RawMessage