Skip to content

Commit

Permalink
DynamoDB.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 27, 2024
1 parent 3f110ff commit ae9c5f2
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,22 @@ func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer
}

func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer) {
var attempts int

// Is there another go-routine processing this shard?
if s.storage.GetShardProcessing(*shard.ShardId) {
return
}

if shard.ParentShardId != nil {
parentID := *shard.ParentShardId
if !s.storage.GetShardProcessed(parentID) && s.storage.GetShardProcessing(parentID) {
slog.Info("Parent shard is being processed, let's sleep and retry", slog.String("shardId", *shard.ShardId))

time.Sleep(jitter.Jitter(500, jitter.DefaultMaxMs, 0))
s.processShard(ctx, shard, writer)
return
}
}

// If no one is processing it, let's mark it as being processed.
s.storage.SetShardProcessing(*shard.ShardId)
if s.storage.GetShardProcessed(*shard.ShardId) {
Expand Down Expand Up @@ -103,6 +112,7 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S
logger.Panic("Failed to publish messages, exiting...", slog.Any("err", err))
}

var attempts int
if len(getRecordsOutput.Records) > 0 {
attempts = 0
lastRecord := getRecordsOutput.Records[len(getRecordsOutput.Records)-1]
Expand Down

0 comments on commit ae9c5f2

Please sign in to comment.