diff --git a/config/mongodb.go b/config/mongodb.go index 91a87ea1..2409d40e 100644 --- a/config/mongodb.go +++ b/config/mongodb.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/artie-labs/reader/constants" - "github.com/artie-labs/transfer/lib/stringutil" ) func (s StreamingSettings) Validate() error { @@ -27,9 +26,7 @@ type StreamingSettings struct { } type MongoDB struct { - Host string `yaml:"host"` - Username string `yaml:"username,omitempty"` - Password string `yaml:"password,omitempty"` + URI string `yaml:"uri"` Database string `yaml:"database"` Collections []Collection `yaml:"collections"` StreamingSettings StreamingSettings `yaml:"streamingSettings,omitempty"` @@ -38,6 +35,11 @@ type MongoDB struct { // DisableFullDocumentBeforeChange - This is relevant if you're connecting to Document DB. // BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field. DisableFullDocumentBeforeChange bool `yaml:"disableFullDocumentBeforeChange,omitempty"` + + // Deprecated - use [MongoDB.URI] instead. + Host string `yaml:"host"` + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` } type Collection struct { @@ -60,8 +62,12 @@ func (m MongoDB) GetStreamingBatchSize() int32 { } func (m MongoDB) Validate() error { - if stringutil.Empty(m.Host, m.Database) { - return fmt.Errorf("one of the MongoDB settings is empty: host or database") + if m.Host == "" && m.URI == "" { + return fmt.Errorf("either host or URI must be passed in") + } + + if m.Database == "" { + return fmt.Errorf("database is empty") } if len(m.Collections) == 0 { diff --git a/lib/mongo/config.go b/lib/mongo/config.go index 3430026e..f955f070 100644 --- a/lib/mongo/config.go +++ b/lib/mongo/config.go @@ -2,22 +2,32 @@ package mongo import ( "crypto/tls" + "fmt" + "github.com/artie-labs/reader/config" "go.mongodb.org/mongo-driver/mongo/options" ) -func OptsFromConfig(cfg config.MongoDB) *options.ClientOptions { - opts := options.Client().ApplyURI(cfg.Host) - if !cfg.DisableTLS { - opts = opts.SetTLSConfig(&tls.Config{}) +func OptsFromConfig(cfg config.MongoDB) (*options.ClientOptions, error) { + opts := options.Client() + + if cfg.URI != "" { + opts = opts.ApplyURI(cfg.URI) + } else if cfg.Host != "" { + opts = opts.ApplyURI(cfg.Host) + if cfg.Username != "" && cfg.Password != "" { + opts = opts.SetAuth(options.Credential{ + Username: cfg.Username, + Password: cfg.Password, + }) + } + } else { + return nil, fmt.Errorf("mongoDB requires a URI or host") } - if cfg.Username != "" && cfg.Password != "" { - opts = opts.SetAuth(options.Credential{ - Username: cfg.Username, - Password: cfg.Password, - }) + if !cfg.DisableTLS { + opts = opts.SetTLSConfig(&tls.Config{}) } - return opts + return opts, nil } diff --git a/lib/mongo/config_test.go b/lib/mongo/config_test.go index 19bdaa30..4a9ed487 100644 --- a/lib/mongo/config_test.go +++ b/lib/mongo/config_test.go @@ -1,9 +1,10 @@ package mongo import ( + "testing" + "github.com/artie-labs/reader/config" "github.com/stretchr/testify/assert" - "testing" ) func TestOptsFromConfig(t *testing.T) { @@ -14,7 +15,8 @@ func TestOptsFromConfig(t *testing.T) { Password: "password", } - opts := OptsFromConfig(cfg) + opts, err := OptsFromConfig(cfg) + assert.NoError(t, err) assert.NotNil(t, opts.TLSConfig) assert.Equal(t, "user", opts.Auth.Username) assert.Equal(t, "password", opts.Auth.Password) @@ -25,7 +27,8 @@ func TestOptsFromConfig(t *testing.T) { Host: "localhost", } - opts := OptsFromConfig(cfg) + opts, err := OptsFromConfig(cfg) + assert.NoError(t, err) assert.Nil(t, opts.Auth) } { @@ -35,7 +38,20 @@ func TestOptsFromConfig(t *testing.T) { DisableTLS: true, } - opts := OptsFromConfig(cfg) + opts, err := OptsFromConfig(cfg) + assert.NoError(t, err) assert.Nil(t, opts.TLSConfig) } + { + // Using URI: + cfg := config.MongoDB{ + URI: "mongodb://user:pass@localhost", + } + + opts, err := OptsFromConfig(cfg) + assert.NoError(t, err) + assert.NotNil(t, opts.TLSConfig) + assert.Equal(t, "user", opts.Auth.Username) + assert.Equal(t, "pass", opts.Auth.Password) + } } diff --git a/sources/mongo/mongo.go b/sources/mongo/mongo.go index 21fbc070..cd7f132c 100644 --- a/sources/mongo/mongo.go +++ b/sources/mongo/mongo.go @@ -20,7 +20,15 @@ type Source struct { } func Load(ctx context.Context, cfg config.MongoDB) (*Source, bool, error) { - client, err := mongo.Connect(ctx, mongoLib.OptsFromConfig(cfg)) + opts, err := mongoLib.OptsFromConfig(cfg) + if err != nil { + return nil, false, fmt.Errorf("failed to build options for MongoDB: %w", err) + } + if err := opts.Validate(); err != nil { + return nil, false, fmt.Errorf("validation failed for MongoDB options: %w", err) + } + + client, err := mongo.Connect(ctx, opts) if err != nil { return nil, false, fmt.Errorf("failed to connect to MongoDB: %w", err) }