Skip to content

Commit

Permalink
Clean up code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 1, 2024
1 parent e4a6c1c commit 07f8606
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
3 changes: 2 additions & 1 deletion sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Wri
return fmt.Errorf("failed to retrieve primary keys: %w", err)
}

writer.SetRunOnComplete(false)
for _, file := range s.cfg.SnapshotSettings.SpecifiedFiles {
logFields := []any{
slog.String("fileName", *file.Key),
Expand Down Expand Up @@ -101,5 +102,5 @@ func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer writers.Wri
slog.Info("Successfully processed file...", logFields...)
}

return nil
return writer.OnComplete()
}
21 changes: 18 additions & 3 deletions writers/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ type DestinationWriter interface {
type Writer struct {
destinationWriter DestinationWriter
logProgress bool
runOnComplete bool
}

func (w *Writer) SetRunOnComplete(runOnComplete bool) {
w.runOnComplete = runOnComplete
}

func New(destinationWriter DestinationWriter, logProgress bool) Writer {
return Writer{destinationWriter, logProgress}
return Writer{destinationWriter: destinationWriter, logProgress: logProgress, runOnComplete: true}
}

// Write writes all the messages from an iterator to the destination.
Expand Down Expand Up @@ -55,9 +60,19 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess
}
}

if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
if w.runOnComplete {
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}
}

return count, nil
}

func (w *Writer) OnComplete() error {
if err := w.destinationWriter.OnComplete(); err != nil {
return fmt.Errorf("failed running destination OnComplete: %w", err)
}

return nil
}

0 comments on commit 07f8606

Please sign in to comment.