Skip to content

Commit

Permalink
[DynamoDB] Improve snapshot capabilities (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 1, 2024
1 parent 5be770f commit 45e49c3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
5 changes: 5 additions & 0 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (s *S3Client) ListFiles(fp string) ([]S3File, error) {
// It's not a typical JSON file in that it is compressed and it's new line delimited via separated via an array
// Which means we can stream this file row by row to not OOM.
func (s *S3Client) StreamJsonGzipFile(file S3File, ch chan<- dynamodb.ItemResponse) error {
const maxBufferSize = 1024 * 1024 // 1 MB or adjust as needed

defer close(ch)
result, err := s.client.GetObject(&s3.GetObjectInput{
Bucket: file.Bucket,
Expand All @@ -88,6 +90,9 @@ func (s *S3Client) StreamJsonGzipFile(file S3File, ch chan<- dynamodb.ItemRespon
defer gz.Close()

scanner := bufio.NewScanner(gz)
buf := make([]byte, maxBufferSize)
scanner.Buffer(buf, maxBufferSize)

for scanner.Scan() {
line := scanner.Bytes()
var content dynamodb.ItemResponse
Expand Down
3 changes: 2 additions & 1 deletion sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Wri
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}

writer.SetRunOnComplete(false)
for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles {
logFields := []any{
slog.String("fileName", *file.Key),
Expand Down Expand Up @@ -101,5 +102,5 @@ func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Wri
slog.Info("Successfully processed file...", logFields...)
}

return nil
return writer.OnComplete()
}
21 changes: 18 additions & 3 deletions writers/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ type DestinationWriter interface {
type Writer struct {
destinationWriter DestinationWriter
logProgress bool
runOnComplete bool
}

func (w *Writer) SetRunOnComplete(runOnComplete bool) {
w.runOnComplete = runOnComplete
}

func New(destinationWriter DestinationWriter, logProgress bool) Writer {
return Writer{destinationWriter, logProgress}
return Writer{destinationWriter: destinationWriter, logProgress: logProgress, runOnComplete: true}
}

// Write writes all the messages from an iterator to the destination.
Expand Down Expand Up @@ -55,9 +60,19 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess
}
}

if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
if w.runOnComplete {
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}
}

return count, nil
}

func (w *Writer) OnComplete() error {
if err := w.destinationWriter.OnComplete(); err != nil {
return fmt.Errorf("failed running destination OnComplete: %w", err)
}

return nil
}

0 comments on commit 45e49c3

Please sign in to comment.