Skip to content

Commit

Permalink
Moving DynamoDB files around (#474)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 22, 2024
1 parent fbe4cb8 commit e0fa8ec
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 59 deletions.
12 changes: 3 additions & 9 deletions sources/dynamodb/primary_keys.go → lib/dynamo/primary_keys.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package dynamodb
package dynamo

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

// retrievePrimaryKeys - This function is called when we process the DynamoDB table snapshot.
// This is because the snapshot is a JSON file, and it does not contain which are the partition and sort keys.
func (s *SnapshotStore) retrievePrimaryKeys(ctx context.Context) ([]string, error) {
output, err := s.dynamoDBClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: &s.tableName,
})

func RetrievePrimaryKeys(ctx context.Context, client *dynamodb.Client, tableName string) ([]string, error) {
output, err := client.DescribeTable(ctx, &dynamodb.DescribeTableInput{TableName: &tableName})
if err != nil {
return nil, err
}
Expand Down
30 changes: 4 additions & 26 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,15 @@ package dynamodb
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws/arn"
awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/sources"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
)

const (
jitterSleepBaseMs = 100
shardScannerInterval = 5 * time.Minute
"github.com/artie-labs/reader/sources/dynamodb/snapshot"
"github.com/artie-labs/reader/sources/dynamodb/stream"
)

func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error) {
Expand All @@ -39,21 +30,8 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error
}

if cfg.Snapshot {
return &SnapshotStore{
tableName: cfg.TableName,
streamArn: cfg.StreamArn,
cfg: &cfg,
dynamoDBClient: dynamodb.NewFromConfig(_awsCfg),
s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, _awsCfg),
}, false, nil
return snapshot.NewStore(cfg, _awsCfg), false, nil
} else {
return &StreamStore{
tableName: cfg.TableName,
streamArn: cfg.StreamArn,
cfg: &cfg,
storage: offsets.NewStorage(cfg.OffsetFile, nil, nil),
streams: dynamodbstreams.NewFromConfig(_awsCfg),
shardChan: make(chan types.Shard),
}, true, nil
return stream.NewStore(cfg, _awsCfg), true, nil
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,51 @@
package dynamodb
package snapshot

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/writers"
)

type SnapshotStore struct {
type Store struct {
tableName string
streamArn string
cfg *config.DynamoDB
s3Client *s3lib.S3Client
dynamoDBClient *dynamodb.Client
}

func (s *SnapshotStore) Close() error {
func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store {
return &Store{
tableName: cfg.TableName,
streamArn: cfg.StreamArn,
cfg: &cfg,
s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, awsCfg),
dynamoDBClient: dynamodb.NewFromConfig(awsCfg),
}
}

func (s *Store) Close() error {
return nil
}

func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error {
func (s *Store) Run(ctx context.Context, writer writers.Writer) error {
start := time.Now()
if err := s.scanFilesOverBucket(ctx); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
}

keys, err := s.retrievePrimaryKeys(ctx)
keys, err := dynamo.RetrievePrimaryKeys(ctx, s.dynamoDBClient, s.tableName)
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}
Expand All @@ -58,7 +70,7 @@ func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error {
return nil
}

func (s *SnapshotStore) scanFilesOverBucket(ctx context.Context) error {
func (s *Store) scanFilesOverBucket(ctx context.Context) error {
if len(s.cfg.SnapshotSettings.SpecifiedFiles) > 0 {
// Don't scan because you are already specifying files
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dynamodb
package snapshot

import (
"fmt"
Expand All @@ -7,28 +7,28 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"
)

type SnapshotIterator struct {
type Iterator struct {
ch chan map[string]types.AttributeValue
keys []string
tableName string
batchSize int32
done bool
}

func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *SnapshotIterator {
return &SnapshotIterator{
func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *Iterator {
return &Iterator{
ch: ch,
keys: keys,
tableName: tblName,
batchSize: batchSize,
}
}

func (s *SnapshotIterator) HasNext() bool {
func (s *Iterator) HasNext() bool {
return !s.done
}

func (s *SnapshotIterator) Next() ([]lib.RawMessage, error) {
func (s *Iterator) Next() ([]lib.RawMessage, error) {
var msgs []lib.RawMessage
for msg := range s.ch {
dynamoMsg, err := dynamo.NewMessageFromExport(msg, s.keys, s.tableName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dynamodb
package stream

import (
"context"
Expand All @@ -20,13 +20,13 @@ import (

const maxNumErrs = 25

func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer) {
func (s *Store) ListenToChannel(ctx context.Context, writer writers.Writer) {
for shard := range s.shardChan {
go s.processShard(ctx, shard, writer, 0)
}
}

func (s *StreamStore) reprocessShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int, err error) {
func (s *Store) reprocessShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int, err error) {
if numErrs > maxNumErrs {
logger.Panic(fmt.Sprintf("Failed to process shard: %s and the max number of attempts have been reached", *shard.ShardId), err)
}
Expand All @@ -43,7 +43,7 @@ func (s *StreamStore) reprocessShard(ctx context.Context, shard types.Shard, wri
s.processShard(ctx, shard, writer, numErrs+1)
}

func (s *StreamStore) processShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int) {
func (s *Store) processShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int) {
// Is there another go-routine processing this shard?
if s.storage.GetShardProcessing(*shard.ShardId) {
return
Expand Down
33 changes: 25 additions & 8 deletions sources/dynamodb/stream.go → sources/dynamodb/stream/stream.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package dynamodb
package stream

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
"github.com/artie-labs/reader/writers"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
"github.com/artie-labs/reader/writers"
)

const (
jitterSleepBaseMs = 100
shardScannerInterval = 5 * time.Minute
)

type StreamStore struct {
type Store struct {
tableName string
streamArn string
cfg *config.DynamoDB
Expand All @@ -24,11 +30,22 @@ type StreamStore struct {
shardChan chan types.Shard
}

func (s *StreamStore) Close() error {
func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store {
return &Store{
tableName: cfg.TableName,
streamArn: cfg.StreamArn,
cfg: &cfg,
streams: dynamodbstreams.NewFromConfig(awsCfg),
storage: offsets.NewStorage(cfg.OffsetFile, nil, nil),
shardChan: make(chan types.Shard),
}
}

func (s *Store) Close() error {
return nil
}

func (s *StreamStore) Run(ctx context.Context, writer writers.Writer) error {
func (s *Store) Run(ctx context.Context, writer writers.Writer) error {
ticker := time.NewTicker(shardScannerInterval)

// Start to subscribe to the channel
Expand All @@ -53,7 +70,7 @@ func (s *StreamStore) Run(ctx context.Context, writer writers.Writer) error {
}
}

func (s *StreamStore) scanForNewShards(ctx context.Context) error {
func (s *Store) scanForNewShards(ctx context.Context) error {
var exclusiveStartShardId *string
for {
input := &dynamodbstreams.DescribeStreamInput{
Expand Down

0 comments on commit e0fa8ec

Please sign in to comment.