Skip to content

Commit

Permalink
[DynamoDB] Handling DynamoDB network error. (#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 15, 2024
1 parent 3960063 commit c541662
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
8 changes: 8 additions & 0 deletions lib/ttlmap/ttlmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ type SetArgs struct {
DoNotFlushToDisk bool
}

func (t *TTLMap) Remove(key string) {
t.mu.Lock()
defer t.mu.Unlock()

delete(t.data, key)
t.shouldSave = true
}

func (t *TTLMap) Set(setArgs SetArgs, ttl time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
3 changes: 1 addition & 2 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
"github.com/artie-labs/reader/sources/dynamodb/offsets"
)

// jitterSleepBaseMs - sleep for 50 ms as the base.
const jitterSleepBaseMs = 50
const jitterSleepBaseMs = 100
const shardScannerInterval = 5 * time.Minute

func Load(cfg config.DynamoDB) (sources.Source, bool, error) {
Expand Down
4 changes: 4 additions & 0 deletions sources/dynamodb/offsets/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (o *OffsetStorage) SetShardProcessing(shardID string) {
}, ShardExpirationAndBuffer)
}

func (o *OffsetStorage) UnsetShardProcessing(shardID string) {
o.ttlMap.Remove(shardProcessingKey(shardID))
}

func (o *OffsetStorage) GetShardProcessing(shardID string) bool {
_, isOk := o.ttlMap.Get(shardProcessingKey(shardID))
return isOk
Expand Down
42 changes: 27 additions & 15 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamodb

import (
"context"
"fmt"
"log/slog"
"time"

Expand All @@ -16,25 +17,44 @@ import (
"github.com/artie-labs/reader/writers"
)

const maxNumErrs = 25

func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer) {
for shard := range s.shardChan {
go s.processShard(ctx, shard, writer)
go s.processShard(ctx, shard, writer, 0)
}
}

func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer) {
func (s *StreamStore) reprocessShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer, numErrs int, err error) {
if numErrs > maxNumErrs {
logger.Panic(fmt.Sprintf("Failed to process shard: %s and the max number of attempts have been reached", *shard.ShardId), err)
}

slog.Warn("Failed to process shard, going to try again...",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
slog.Int("numErrs", numErrs),
)

// Unset it so we can process it again
s.storage.UnsetShardProcessing(*shard.ShardId)
s.processShard(ctx, shard, writer, numErrs+1)
}

func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer writers.Writer, numErrs int) {
// Is there another go-routine processing this shard?
if s.storage.GetShardProcessing(*shard.ShardId) {
return
}

if parentID := shard.ParentShardId; parentID != nil {
// Have we seen the parent? If so, let's wait for processing to finish
// If we haven't seen the parent, then we can assume this is the parent and we don't need to wait.
// If we haven't seen the parent, then we can assume this is the parent, and we don't need to wait.
if s.storage.GetShardSeen(*parentID) && !s.storage.GetShardProcessed(*parentID) {
slog.Info("Parent shard is being processed, let's sleep 3s and retry", slog.String("shardId", *shard.ShardId), slog.String("parentShardId", *parentID))
time.Sleep(3 * time.Second)
s.processShard(ctx, shard, writer)
s.processShard(ctx, shard, writer, numErrs)
return
}
}
Expand Down Expand Up @@ -67,11 +87,7 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S

iteratorOutput, err := s.streams.GetShardIterator(iteratorInput)
if err != nil {
slog.Warn("Failed to get shard iterator...",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
)
s.reprocessShard(ctx, shard, writer, numErrs, fmt.Errorf("failed to get shard iterator: %w", err))
return
}

Expand All @@ -85,12 +101,8 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S

getRecordsOutput, err := s.streams.GetRecords(getRecordsInput)
if err != nil {
slog.Warn("Failed to get records from shard iterator...",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
)
break
s.reprocessShard(ctx, shard, writer, numErrs, fmt.Errorf("failed to get records: %w", err))
return
}

var messages []lib.RawMessage
Expand Down

0 comments on commit c541662

Please sign in to comment.