Skip to content

Commit

Permalink
Merge branch 'master' into moving-mongo-around
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 19, 2024
2 parents 3f1ac0d + d71d8a5 commit 9c11ce5
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 84 deletions.
7 changes: 7 additions & 0 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package config

import (
"cmp"
"fmt"
"github.com/artie-labs/reader/constants"

"github.com/artie-labs/transfer/lib/stringutil"

Expand Down Expand Up @@ -42,6 +44,11 @@ type SnapshotSettings struct {
// If the files are not specified, that's okay.
// We will scan the folder and then load into `specifiedFiles`
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"`
BatchSize int32 `yaml:"batchSize"`
}

func (s *SnapshotSettings) GetBatchSize() int32 {
return cmp.Or(s.BatchSize, constants.DefaultBatchSize)
}

func (s *SnapshotSettings) Validate() error {
Expand Down
24 changes: 13 additions & 11 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
}

var files []S3File
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
Bucket: bucket,
Prefix: prefix,
})

paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
Expand All @@ -69,13 +65,19 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
return files, nil
}

// StreamJsonGzipFile - will take a S3 File that is in `json.gz` format from DynamoDB's export to S3
// It's not a typical JSON file in that it is compressed and it's new line delimited via separated via an array
// Which means we can stream this file row by row to not OOM.
func (s *S3Client) StreamJsonGzipFile(ctx context.Context, file S3File, ch chan<- map[string]types.AttributeValue) error {
const maxBufferSize = 1024 * 1024 // 1 MB or adjust as needed

func (s *S3Client) StreamJsonGzipFiles(ctx context.Context, files []S3File, ch chan<- map[string]types.AttributeValue) error {
defer close(ch)
for _, file := range files {
if err := s.streamJsonGzipFile(ctx, file, ch); err != nil {
return fmt.Errorf("failed to read s3: %w", err)
}
}

return nil
}

func (s *S3Client) streamJsonGzipFile(ctx context.Context, file S3File, ch chan<- map[string]types.AttributeValue) error {
const maxBufferSize = 1024 * 1024
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: s.bucketName,
Key: file.Key,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion sources/dynamodb/offsets/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/artie-labs/reader/lib/ttlmap"
"github.com/artie-labs/reader/lib/storage/ttlmap"
)

// ShardExpirationAndBuffer - Buffer for when a shard is closed as the records have a TTL of 24h. However, garbage collection is async.
Expand Down
86 changes: 25 additions & 61 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,21 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/iterator"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/writers"
)

type SnapshotStore struct {
tableName string
streamArn string
cfg *config.DynamoDB

tableName string
streamArn string
cfg *config.DynamoDB
s3Client *s3lib.S3Client
dynamoDBClient *dynamodb.Client
}
Expand All @@ -31,15 +28,33 @@ func (s *SnapshotStore) Close() error {
}

func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error {
start := time.Now()
if err := s.scanFilesOverBucket(ctx); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
}

if err := s.streamAndPublish(ctx, writer); err != nil {
return fmt.Errorf("stream and publish failed: %w", err)
keys, err := s.retrievePrimaryKeys(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}

ch := make(chan map[string]types.AttributeValue)
go func() {
if err = s.s3Client.StreamJsonGzipFiles(ctx, s.cfg.SnapshotSettings.SpecifiedFiles, ch); err != nil {
logger.Panic("Failed to read file", slog.Any("err", err))
}
}()

count, err := writer.Write(ctx, NewSnapshotIterator(ch, keys, s.tableName, s.cfg.SnapshotSettings.GetBatchSize()))
if err != nil {
return fmt.Errorf("failed to snapshot: %w", err)
}

slog.Info("Finished snapshotting all the files")
slog.Info("Finished snapshotting",
slog.String("tableName", s.tableName),
slog.Int("scannedTotal", count),
slog.Duration("totalDuration", time.Since(start)),
)
return nil
}

Expand All @@ -65,54 +80,3 @@ func (s *SnapshotStore) scanFilesOverBucket(ctx context.Context) error {
s.cfg.SnapshotSettings.SpecifiedFiles = files
return nil
}

func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Writer) error {
keys, err := s.retrievePrimaryKeys(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}

writer.SetRunOnComplete(false)
for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles {
logFields := []any{
slog.String("fileName", *file.Key),
}

slog.Info("Processing file...", logFields...)
ch := make(chan map[string]types.AttributeValue)
go func() {
if err := s.s3Client.StreamJsonGzipFile(ctx, file, ch); err != nil {
logger.Panic("Failed to read file", slog.Any("err", err))
}
}()

var messages []lib.RawMessage
for msg := range ch {
dynamoMsg, err := dynamo.NewMessageFromExport(msg, keys, s.tableName)
if err != nil {
return fmt.Errorf("failed to cast message from DynamoDB, msg: %v, err: %w", msg, err)
}

messages = append(messages, dynamoMsg.RawMessage())
// If there are more than 500k messages, we don't need to wait until the whole file is read.
// We can write what we have and continue reading the file. This is done to prevent OOM errors.
if len(messages) > 500_000 {
if _, err = writer.Write(ctx, iterator.Once(messages)); err != nil {
return fmt.Errorf("failed to write messages: %w", err)
}

// Clear messages
messages = []lib.RawMessage{}
}
}

// TODO: Create an actual iterator over the files that is passed to the writer.
if _, err = writer.Write(ctx, iterator.Once(messages)); err != nil {
return fmt.Errorf("failed to write messages: %w", err)
}

slog.Info("Successfully processed file...", logFields...)
}

return writer.OnComplete()
}
47 changes: 47 additions & 0 deletions sources/dynamodb/snapshot_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dynamodb

import (
"fmt"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"
)

type SnapshotIterator struct {
ch chan map[string]types.AttributeValue
keys []string
tableName string
batchSize int32
done bool
}

func NewSnapshotIterator(ch chan map[string]types.AttributeValue, keys []string, tblName string, batchSize int32) *SnapshotIterator {
return &SnapshotIterator{
ch: ch,
keys: keys,
tableName: tblName,
batchSize: batchSize,
}
}

func (s *SnapshotIterator) HasNext() bool {
return !s.done
}

func (s *SnapshotIterator) Next() ([]lib.RawMessage, error) {
var msgs []lib.RawMessage
for msg := range s.ch {
dynamoMsg, err := dynamo.NewMessageFromExport(msg, s.keys, s.tableName)
if err != nil {
return nil, fmt.Errorf("failed to cast message from DynamoDB, msg: %v, err: %w", msg, err)
}

msgs = append(msgs, dynamoMsg.RawMessage())
if int32(len(msgs)) >= s.batchSize {
return msgs, nil
}
}

s.done = true
return msgs, nil
}
2 changes: 1 addition & 1 deletion sources/mongo/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
mongoLib "github.com/artie-labs/reader/lib/mongo"
"github.com/artie-labs/reader/lib/persistedmap"
"github.com/artie-labs/reader/lib/storage/persistedmap"
)

const offsetKey = "offset"
Expand Down
13 changes: 3 additions & 10 deletions writers/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ type DestinationWriter interface {
type Writer struct {
destinationWriter DestinationWriter
logProgress bool
runOnComplete bool
}

func (w *Writer) SetRunOnComplete(runOnComplete bool) {
w.runOnComplete = runOnComplete
}

func New(destinationWriter DestinationWriter, logProgress bool) Writer {
return Writer{destinationWriter: destinationWriter, logProgress: logProgress, runOnComplete: true}
return Writer{destinationWriter: destinationWriter, logProgress: logProgress}
}

// Write writes all the messages from an iterator to the destination.
Expand Down Expand Up @@ -60,10 +55,8 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess
}
}

if w.runOnComplete {
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}

return count, nil
Expand Down

0 comments on commit 9c11ce5

Please sign in to comment.