From 99abb0b2a784da2e2e00f987d14027cd22d55440 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 26 Jun 2024 10:31:09 -0700 Subject: [PATCH] Checkpoint. --- config/mongodb.go | 10 +++++----- sources/mongo/mongo.go | 11 ++++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/config/mongodb.go b/config/mongodb.go index eacd109d3..50967b0d1 100644 --- a/config/mongodb.go +++ b/config/mongodb.go @@ -9,12 +9,12 @@ import ( ) type MongoDB struct { - Host string `yaml:"host"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Database string `yaml:"database"` - + Host string `yaml:"host"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Database string `yaml:"database"` Collections []Collection `yaml:"collections"` + Streaming bool `yaml:"streaming,omitempty"` } type Collection struct { diff --git a/sources/mongo/mongo.go b/sources/mongo/mongo.go index 90a9e97e9..a7d738f2e 100644 --- a/sources/mongo/mongo.go +++ b/sources/mongo/mongo.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "github.com/artie-labs/reader/sources" "log/slog" "time" @@ -15,12 +16,12 @@ import ( "github.com/artie-labs/reader/writers" ) -type Source struct { +type SnapshotStore struct { cfg config.MongoDB db *mongo.Database } -func Load(cfg config.MongoDB) (*Source, error) { +func Load(cfg config.MongoDB) (sources.Source, error) { creds := options.Credential{ Username: cfg.Username, Password: cfg.Password, @@ -38,18 +39,18 @@ func Load(cfg config.MongoDB) (*Source, error) { } db := client.Database(cfg.Database) - return &Source{ + return &SnapshotStore{ cfg: cfg, db: db, }, nil } -func (s *Source) Close() error { +func (s *SnapshotStore) Close() error { // MongoDB doesn't need to be closed. return nil } -func (s *Source) Run(ctx context.Context, writer writers.Writer) error { +func (s *SnapshotStore) Run(ctx context.Context, writer writers.Writer) error { for _, collection := range s.cfg.Collections { snapshotStartTime := time.Now()