diff --git a/sources/dynamodb/primary_keys.go b/lib/dynamo/primary_keys.go similarity index 50% rename from sources/dynamodb/primary_keys.go rename to lib/dynamo/primary_keys.go index 9a569d83..f1b24191 100644 --- a/sources/dynamodb/primary_keys.go +++ b/lib/dynamo/primary_keys.go @@ -1,19 +1,13 @@ -package dynamodb +package dynamo import ( "context" "fmt" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" ) -// retrievePrimaryKeys - This function is called when we process the DynamoDB table snapshot. -// This is because the snapshot is a JSON file, and it does not contain which are the partition and sort keys. -func (s *SnapshotStore) retrievePrimaryKeys(ctx context.Context) ([]string, error) { - output, err := s.dynamoDBClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{ - TableName: &s.tableName, - }) - +func RetrievePrimaryKeys(ctx context.Context, client *dynamodb.Client, tableName string) ([]string, error) { + output, err := client.DescribeTable(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}) if err != nil { return nil, err } diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 4e0afe47..5c643c38 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -3,24 +3,15 @@ package dynamodb import ( "context" "fmt" - "time" "github.com/aws/aws-sdk-go-v2/aws/arn" awsCfg "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" - "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/lib/s3lib" "github.com/artie-labs/reader/sources" - "github.com/artie-labs/reader/sources/dynamodb/offsets" -) - -const ( - jitterSleepBaseMs = 100 - shardScannerInterval = 5 * time.Minute + "github.com/artie-labs/reader/sources/dynamodb/snapshot" + "github.com/artie-labs/reader/sources/dynamodb/stream" ) func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error) { @@ -39,21 +30,8 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error } if cfg.Snapshot { - return &SnapshotStore{ - tableName: cfg.TableName, - streamArn: cfg.StreamArn, - cfg: &cfg, - dynamoDBClient: dynamodb.NewFromConfig(_awsCfg), - s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, _awsCfg), - }, false, nil + return snapshot.NewStore(cfg, _awsCfg), false, nil } else { - return &StreamStore{ - tableName: cfg.TableName, - streamArn: cfg.StreamArn, - cfg: &cfg, - storage: offsets.NewStorage(cfg.OffsetFile, nil, nil), - streams: dynamodbstreams.NewFromConfig(_awsCfg), - shardChan: make(chan types.Shard), - }, true, nil + return stream.NewStore(cfg, _awsCfg), true, nil } } diff --git a/sources/dynamodb/snapshot.go b/sources/dynamodb/snapshot/snapshot.go similarity index 74% rename from sources/dynamodb/snapshot.go rename to sources/dynamodb/snapshot/snapshot.go index e66fe935..72b3ae60 100644 --- a/sources/dynamodb/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -1,4 +1,4 @@ -package dynamodb +package snapshot import ( "context" @@ -6,16 +6,18 @@ import ( "log/slog" "time" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/lib/dynamo" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/reader/lib/s3lib" "github.com/artie-labs/reader/writers" ) -type SnapshotStore struct { +type Store struct { tableName string streamArn string cfg *config.DynamoDB @@ -23,17 +25,27 @@ type SnapshotStore struct { dynamoDBClient *dynamodb.Client } -func (s *SnapshotStore) Close() error { +func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store { + return &Store{ + tableName: cfg.TableName, + streamArn: cfg.StreamArn, + cfg: &cfg, + s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, awsCfg), + dynamoDBClient: dynamodb.NewFromConfig(awsCfg), + } +} + +func (s *Store) Close() error { return nil } -func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error { +func (s *Store) Run(ctx context.Context, writer writers.Writer) error { start := time.Now() if err := s.scanFilesOverBucket(ctx); err != nil { return fmt.Errorf("scanning files over bucket failed: %w", err) } - keys, err := s.retrievePrimaryKeys(ctx) + keys, err := dynamo.RetrievePrimaryKeys(ctx, s.dynamoDBClient, s.tableName) if err != nil { return fmt.Errorf("failed to retrieve primary keys: %w", err) } @@ -58,7 +70,7 @@ func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error { return nil } -func (s *SnapshotStore) scanFilesOverBucket(ctx context.Context) error { +func (s *Store) scanFilesOverBucket(ctx context.Context) error { if len(s.cfg.SnapshotSettings.SpecifiedFiles) > 0 { // Don't scan because you are already specifying files return nil diff --git a/sources/dynamodb/snapshot_iterator.go b/sources/dynamodb/snapshot/snapshot_iterator.go similarity index 77% rename from sources/dynamodb/snapshot_iterator.go rename to sources/dynamodb/snapshot/snapshot_iterator.go index 50ae95de..0b0e061a 100644 --- a/sources/dynamodb/snapshot_iterator.go +++ b/sources/dynamodb/snapshot/snapshot_iterator.go @@ -1,4 +1,4 @@ -package dynamodb +package snapshot import ( "fmt" @@ -7,7 +7,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" ) -type SnapshotIterator struct { +type Iterator struct { ch chan map[string]types.AttributeValue keys []string tableName string @@ -15,8 +15,8 @@ type SnapshotIterator struct { done bool } -func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *SnapshotIterator { - return &SnapshotIterator{ +func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *Iterator { + return &Iterator{ ch: ch, keys: keys, tableName: tblName, @@ -24,11 +24,11 @@ func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, } } -func (s *SnapshotIterator) HasNext() bool { +func (s *Iterator) HasNext() bool { return !s.done } -func (s *SnapshotIterator) Next() ([]lib.RawMessage, error) { +func (s *Iterator) Next() ([]lib.RawMessage, error) { var msgs []lib.RawMessage for msg := range s.ch { dynamoMsg, err := dynamo.NewMessageFromExport(msg, s.keys, s.tableName) diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/stream/shard.go similarity index 93% rename from sources/dynamodb/shard.go rename to sources/dynamodb/stream/shard.go index 387f53c3..a12a817f 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/stream/shard.go @@ -1,4 +1,4 @@ -package dynamodb +package stream import ( "context" @@ -20,13 +20,13 @@ import ( const maxNumErrs = 25 -func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer) { +func (s *Store) ListenToChannel(ctx context.Context, writer writers.Writer) { for shard := range s.shardChan { go s.processShard(ctx, shard, writer, 0) } } -func (s *StreamStore) reprocessShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int, err error) { +func (s *Store) reprocessShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int, err error) { if numErrs > maxNumErrs { logger.Panic(fmt.Sprintf("Failed to process shard: %s and the max number of attempts have been reached", *shard.ShardId), err) } @@ -43,7 +43,7 @@ func (s *StreamStore) reprocessShard(ctx context.Context, shard types.Shard, wri s.processShard(ctx, shard, writer, numErrs+1) } -func (s *StreamStore) processShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int) { +func (s *Store) processShard(ctx context.Context, shard types.Shard, writer writers.Writer, numErrs int) { // Is there another go-routine processing this shard? if s.storage.GetShardProcessing(*shard.ShardId) { return diff --git a/sources/dynamodb/stream.go b/sources/dynamodb/stream/stream.go similarity index 78% rename from sources/dynamodb/stream.go rename to sources/dynamodb/stream/stream.go index 9f38790e..0aee6d99 100644 --- a/sources/dynamodb/stream.go +++ b/sources/dynamodb/stream/stream.go @@ -1,4 +1,4 @@ -package dynamodb +package stream import ( "context" @@ -6,15 +6,21 @@ import ( "log/slog" "time" - "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/sources/dynamodb/offsets" - "github.com/artie-labs/reader/writers" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" + + "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/sources/dynamodb/offsets" + "github.com/artie-labs/reader/writers" +) + +const ( + jitterSleepBaseMs = 100 + shardScannerInterval = 5 * time.Minute ) -type StreamStore struct { +type Store struct { tableName string streamArn string cfg *config.DynamoDB @@ -24,11 +30,22 @@ type StreamStore struct { shardChan chan types.Shard } -func (s *StreamStore) Close() error { +func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store { + return &Store{ + tableName: cfg.TableName, + streamArn: cfg.StreamArn, + cfg: &cfg, + streams: dynamodbstreams.NewFromConfig(awsCfg), + storage: offsets.NewStorage(cfg.OffsetFile, nil, nil), + shardChan: make(chan types.Shard), + } +} + +func (s *Store) Close() error { return nil } -func (s *StreamStore) Run(ctx context.Context, writer writers.Writer) error { +func (s *Store) Run(ctx context.Context, writer writers.Writer) error { ticker := time.NewTicker(shardScannerInterval) // Start to subscribe to the channel @@ -53,7 +70,7 @@ func (s *StreamStore) Run(ctx context.Context, writer writers.Writer) error { } } -func (s *StreamStore) scanForNewShards(ctx context.Context) error { +func (s *Store) scanForNewShards(ctx context.Context) error { var exclusiveStartShardId *string for { input := &dynamodbstreams.DescribeStreamInput{