diff --git a/sources/mongo/mongo.go b/sources/mongo/mongo.go index eb823645..b82b7359 100644 --- a/sources/mongo/mongo.go +++ b/sources/mongo/mongo.go @@ -51,8 +51,12 @@ func (s *Source) Close() error { func (s *Source) Run(ctx context.Context, writer writers.Writer) error { if s.cfg.Streaming { - iterator := newStreamingIterator(s.db, s.cfg, s.cfg.OffsetsFile) - if _, err := writer.Write(ctx, iterator); err != nil { + iterator, err := newStreamingIterator(ctx, s.db, s.cfg, s.cfg.OffsetsFile) + if err != nil { + return err + } + + if _, err = writer.Write(ctx, iterator); err != nil { return fmt.Errorf("failed to stream: %w", err) } } else { diff --git a/sources/mongo/streaming.go b/sources/mongo/streaming.go index 9e70363e..d0ac1d93 100644 --- a/sources/mongo/streaming.go +++ b/sources/mongo/streaming.go @@ -1,6 +1,7 @@ package mongo import ( + "context" "fmt" "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" @@ -9,25 +10,41 @@ import ( ) type streaming struct { - db *mongo.Database - cfg config.MongoDB - - offsets *persistedmap.PersistedMap + db *mongo.Database + cfg config.MongoDB + changeStream *mongo.ChangeStream + ctx context.Context + collectionsToWatchMap map[string]bool + offsets *persistedmap.PersistedMap } -func newStreamingIterator(db *mongo.Database, cfg config.MongoDB, filePath string) *streaming { - return &streaming{ - db: db, - cfg: cfg, - offsets: persistedmap.NewPersistedMap(filePath), +func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.MongoDB, filePath string) (*streaming, error) { + collectionsToWatchMap := make(map[string]bool) + for _, collection := range cfg.Collections { + collectionsToWatchMap[collection.Name] = true + } + + cs, err := db.Watch(ctx, mongo.Pipeline{}) + if err != nil { + return nil, fmt.Errorf("failed to start change stream: %w", err) } + + return &streaming{ + db: db, + cfg: cfg, + changeStream: cs, + ctx: ctx, + collectionsToWatchMap: collectionsToWatchMap, + offsets: persistedmap.NewPersistedMap(filePath), + }, nil } func (s *streaming) HasNext() bool { // Streaming mode always has next - return true + return s.changeStream.Next(s.ctx) } func (s *streaming) Next() ([]lib.RawMessage, error) { + return nil, fmt.Errorf("not implemented") }