diff --git a/sources/mongo/streaming.go b/sources/mongo/streaming.go index d3f1661a..0ea4e444 100644 --- a/sources/mongo/streaming.go +++ b/sources/mongo/streaming.go @@ -9,9 +9,12 @@ import ( "github.com/artie-labs/reader/lib/persistedmap" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "time" ) +const offsetKey = "offset" + type streaming struct { db *mongo.Database cfg config.MongoDB @@ -38,8 +41,19 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo }}}, } - // TODO: Full document if available. - cs, err := db.Watch(ctx, pipeline) + opts := options.ChangeStream() + storage := persistedmap.NewPersistedMap(filePath) + if resumeToken, exists := storage.Get(offsetKey); exists { + castedResumeToken, isOk := resumeToken.(bson.Raw) + if !isOk { + return nil, fmt.Errorf("expected resume token to be bson.Raw, got: %T", resumeToken) + } + + opts.SetResumeAfter(castedResumeToken) + } + + // TODO: Think about full document delete + cs, err := db.Watch(ctx, pipeline, opts) if err != nil { return nil, fmt.Errorf("failed to start change stream: %w", err) } @@ -52,7 +66,7 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo changeStream: cs, ctx: ctx, collectionsToWatchMap: collectionsToWatchMap, - offsets: persistedmap.NewPersistedMap(filePath), + offsets: storage, }, nil } @@ -63,14 +77,13 @@ func (s *streaming) HasNext() bool { func (s *streaming) Next() ([]lib.RawMessage, error) { var rawMsgs []lib.RawMessage - - // Check for new events if s.batchSize > int32(len(rawMsgs)) && s.changeStream.Next(s.ctx) { var changeEvent bson.M if err := s.changeStream.Decode(&changeEvent); err != nil { return nil, fmt.Errorf("failed to decode change event: %v", err) } + s.offsets.Set(offsetKey, s.changeStream.ResumeToken()) ns, isOk := changeEvent["ns"] if !isOk { return nil, fmt.Errorf("failed to get namespace from change event: %v", changeEvent)