Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-aws-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 8, 2024
2 parents 2005a4d + 411ed2a commit ca81ea4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
7 changes: 4 additions & 3 deletions config/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type MongoDB struct {
}

type Collection struct {
Name string `yaml:"name"`
BatchSize int32 `yaml:"batchSize,omitempty"`
// TODO: In the future, we should be able to support customers passing Start/End PK values.
Name string `yaml:"name"`
BatchSize int32 `yaml:"batchSize,omitempty"`
StartObjectID string `yaml:"startObjectID,omitempty"`
EndObjectID string `yaml:"endObjectID,omitempty"`
}

func (c Collection) TopicSuffix(db string) string {
Expand Down
27 changes: 24 additions & 3 deletions sources/mongo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package mongo
import (
"context"
"fmt"
"github.com/artie-labs/reader/lib/iterator"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
Expand Down Expand Up @@ -41,9 +41,30 @@ func (s *snapshotIterator) Next() ([]lib.RawMessage, error) {

ctx := context.Background()
if s.cursor == nil {
// Filter
filter := bson.D{}
if s.collection.StartObjectID != "" {
key, err := primitive.ObjectIDFromHex(s.collection.StartObjectID)
if err != nil {
return nil, fmt.Errorf("failed to parse start object id %q: %w", s.collection.StartObjectID, err)
}

filter = append(filter, bson.E{Key: "_id", Value: bson.D{{Key: "$gte", Value: key}}})
}

if s.collection.EndObjectID != "" {
key, err := primitive.ObjectIDFromHex(s.collection.EndObjectID)
if err != nil {
return nil, fmt.Errorf("failed to parse end object id %q: %w", s.collection.EndObjectID, err)
}

filter = append(filter, bson.E{Key: "_id", Value: bson.D{{Key: "$lte", Value: key}}})
}

// Find options
findOptions := options.Find()
findOptions.SetBatchSize(s.collection.GetBatchSize())
cursor, err := s.db.Collection(s.collection.Name).Find(ctx, bson.D{}, findOptions)
cursor, err := s.db.Collection(s.collection.Name).Find(ctx, filter, findOptions)
if err != nil {
return nil, fmt.Errorf("failed to find documents: %w", err)
}
Expand Down

0 comments on commit ca81ea4

Please sign in to comment.