Skip to content

Commit

Permalink
All working.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 1, 2024
1 parent c0389d7 commit 8b035cb
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions sources/mongo/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"github.com/artie-labs/reader/lib/persistedmap"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)

const offsetKey = "offset"

type streaming struct {
db *mongo.Database
cfg config.MongoDB
Expand All @@ -38,8 +41,19 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo
}}},
}

// TODO: Full document if available.
cs, err := db.Watch(ctx, pipeline)
opts := options.ChangeStream()
storage := persistedmap.NewPersistedMap(filePath)
if resumeToken, exists := storage.Get(offsetKey); exists {
castedResumeToken, isOk := resumeToken.(bson.Raw)
if !isOk {
return nil, fmt.Errorf("expected resume token to be bson.Raw, got: %T", resumeToken)
}

opts.SetResumeAfter(castedResumeToken)
}

// TODO: Think about full document delete
cs, err := db.Watch(ctx, pipeline, opts)
if err != nil {
return nil, fmt.Errorf("failed to start change stream: %w", err)
}
Expand All @@ -52,7 +66,7 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo
changeStream: cs,
ctx: ctx,
collectionsToWatchMap: collectionsToWatchMap,
offsets: persistedmap.NewPersistedMap(filePath),
offsets: storage,
}, nil
}

Expand All @@ -63,14 +77,13 @@ func (s *streaming) HasNext() bool {

func (s *streaming) Next() ([]lib.RawMessage, error) {
var rawMsgs []lib.RawMessage

// Check for new events
if s.batchSize > int32(len(rawMsgs)) && s.changeStream.Next(s.ctx) {
var changeEvent bson.M
if err := s.changeStream.Decode(&changeEvent); err != nil {
return nil, fmt.Errorf("failed to decode change event: %v", err)
}

s.offsets.Set(offsetKey, s.changeStream.ResumeToken())
ns, isOk := changeEvent["ns"]
if !isOk {
return nil, fmt.Errorf("failed to get namespace from change event: %v", changeEvent)
Expand Down

0 comments on commit 8b035cb

Please sign in to comment.