diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index aba6b189..0f7a2010 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -23,13 +23,22 @@ func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer } func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer) { - var attempts int - // Is there another go-routine processing this shard? if s.storage.GetShardProcessing(*shard.ShardId) { return } + if shard.ParentShardId != nil { + parentID := *shard.ParentShardId + if !s.storage.GetShardProcessed(parentID) && s.storage.GetShardProcessing(parentID) { + slog.Info("Parent shard is being processed, let's sleep and retry", slog.String("shardId", *shard.ShardId)) + + time.Sleep(jitter.Jitter(500, jitter.DefaultMaxMs, 0)) + s.processShard(ctx, shard, writer) + return + } + } + // If no one is processing it, let's mark it as being processed. s.storage.SetShardProcessing(*shard.ShardId) if s.storage.GetShardProcessed(*shard.ShardId) { @@ -103,6 +112,7 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S logger.Panic("Failed to publish messages, exiting...", slog.Any("err", err)) } + var attempts int if len(getRecordsOutput.Records) > 0 { attempts = 0 lastRecord := getRecordsOutput.Records[len(getRecordsOutput.Records)-1]