Skip to content

Commit

Permalink
[Mongo] Moving files around (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 19, 2024
1 parent d71d8a5 commit ac67568
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 14 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func buildSource(ctx context.Context, cfg *config.Settings) (sources.Source, boo
case config.SourceDynamo:
source, isStreamingMode, err = dynamodb.Load(ctx, *cfg.DynamoDB)
case config.SourceMongoDB:
return mongo.Load(*cfg.MongoDB)
return mongo.Load(ctx, *cfg.MongoDB)
case config.SourceMSSQL:
source, err = mssql.Load(*cfg.MSSQL)
case config.SourceMySQL:
Expand Down
9 changes: 2 additions & 7 deletions sources/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type Source struct {
db *mongo.Database
}

func Load(cfg config.MongoDB) (*Source, bool, error) {
ctx := context.Background()
func Load(ctx context.Context, cfg config.MongoDB) (*Source, bool, error) {
client, err := mongo.Connect(ctx, mongoLib.OptsFromConfig(cfg))
if err != nil {
return nil, false, fmt.Errorf("failed to connect to MongoDB: %w", err)
Expand All @@ -30,11 +29,7 @@ func Load(cfg config.MongoDB) (*Source, bool, error) {
return nil, false, fmt.Errorf("failed to ping MongoDB: %w", err)
}

db := client.Database(cfg.Database)
return &Source{
cfg: cfg,
db: db,
}, cfg.StreamingSettings.Enabled, nil
return &Source{cfg: cfg, db: client.Database(cfg.Database)}, cfg.StreamingSettings.Enabled, nil
}

func (s *Source) Close() error {
Expand Down
11 changes: 7 additions & 4 deletions sources/mongo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package mongo
import (
"context"
"fmt"
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
mongoLib "github.com/artie-labs/reader/lib/mongo"
)

type snapshotIterator struct {
Expand Down Expand Up @@ -79,7 +82,7 @@ func (s *snapshotIterator) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to decode document: %w", err)
}

mgoMsg, err := ParseMessage(result, nil, "r")
mgoMsg, err := mongoLib.ParseMessage(result, nil, "r")
if err != nil {
return nil, fmt.Errorf("failed to parse message: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion sources/mongo/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
mongoLib "github.com/artie-labs/reader/lib/mongo"
"github.com/artie-labs/reader/lib/storage/persistedmap"
)

Expand Down Expand Up @@ -108,7 +109,7 @@ func (s *streaming) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to decode change event: %w", err)
}

changeEvent, err := NewChangeEvent(rawChangeEvent)
changeEvent, err := mongoLib.NewChangeEvent(rawChangeEvent)
if err != nil {
return nil, fmt.Errorf("failed to parse change event: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion writers/transfer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/sources/mongo"
"github.com/artie-labs/reader/lib/mongo"
)

func TestWriter_MessageToEvent(t *testing.T) {
Expand Down

0 comments on commit ac67568

Please sign in to comment.