diff --git a/config/config.go b/config/config.go index 9718b094..e5c0aaf2 100644 --- a/config/config.go +++ b/config/config.go @@ -62,13 +62,18 @@ type Metrics struct { type Source string const ( - SourceInvalid Source = "invalid" SourceDynamo Source = "dynamodb" SourceMongoDB Source = "mongodb" SourceMySQL Source = "mysql" SourcePostgreSQL Source = "postgresql" ) +type Destination string + +const ( + DestinationKafka Destination = "kafka" +) + type Settings struct { Source Source `yaml:"source"` DynamoDB *DynamoDB `yaml:"dynamodb,omitempty"` @@ -76,9 +81,11 @@ type Settings struct { MySQL *MySQL `yaml:"mysql,omitempty"` PostgreSQL *PostgreSQL `yaml:"postgresql,omitempty"` + Destination Destination `yaml:"destination"` + Kafka *Kafka `yaml:"kafka"` + Reporting *Reporting `yaml:"reporting"` Metrics *Metrics `yaml:"metrics"` - Kafka *Kafka `yaml:"kafka"` } func (s *Settings) Validate() error { @@ -86,17 +93,8 @@ func (s *Settings) Validate() error { return fmt.Errorf("config is nil") } - if s.Kafka == nil { - return fmt.Errorf("kafka config is nil") - } - - if err := s.Kafka.Validate(); err != nil { - return fmt.Errorf("kafka validation failed: %w", err) - } - switch s.Source { - // By default, if you don't pass in a source -- it will be dynamodb for backwards compatibility - case SourceDynamo, "": + case SourceDynamo: if s.DynamoDB == nil { return fmt.Errorf("dynamodb config is nil") } @@ -128,7 +126,21 @@ func (s *Settings) Validate() error { if err := s.PostgreSQL.Validate(); err != nil { return fmt.Errorf("postgres validation failed: %w", err) } + default: + return fmt.Errorf("invalid source: %s ", s.Source) + } + switch s.Destination { + case DestinationKafka: + if s.Kafka == nil { + return fmt.Errorf("kafka config is nil") + } + + if err := s.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka validation failed: %w", err) + } + default: + return fmt.Errorf("invalid destination: %s ", s.Source) } return nil @@ -146,6 +158,16 @@ func ReadConfig(fp string) (*Settings, error) { return nil, fmt.Errorf("failed to unmarshal config file: %w", err) } + if settings.Source == "" { + // By default, if you don't pass in a source -- it will be dynamodb for backwards compatibility + settings.Source = SourceDynamo + } + + if settings.Destination == "" { + // By default, if you don't pass in a destination -- it will be Kafka for backwards compatibility + settings.Destination = DestinationKafka + } + if err = settings.Validate(); err != nil { return nil, fmt.Errorf("failed to validate config file: %w", err) } diff --git a/destinations/destination.go b/destinations/destination.go new file mode 100644 index 00000000..f8deca9f --- /dev/null +++ b/destinations/destination.go @@ -0,0 +1,17 @@ +package destinations + +import ( + "context" + + "github.com/artie-labs/reader/lib" +) + +type RawMessageIterator interface { + HasNext() bool + Next() ([]lib.RawMessage, error) +} + +type DestinationWriter interface { + WriteIterator(ctx context.Context, iter RawMessageIterator) (int, error) + WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error +} diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 1a4f344d..dd57b144 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -11,6 +11,7 @@ import ( "github.com/segmentio/kafka-go" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/iterator" "github.com/artie-labs/reader/lib/mtr" @@ -134,12 +135,7 @@ func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e return nil } -type messageIterator interface { - HasNext() bool - Next() ([]lib.RawMessage, error) -} - -func (b *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (int, error) { +func (b *BatchWriter) WriteIterator(ctx context.Context, iter destinations.RawMessageIterator) (int, error) { start := time.Now() var count int for iter.HasNext() { diff --git a/main.go b/main.go index e4cf99d3..62062102 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "github.com/artie-labs/transfer/lib/telemetry/metrics" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/reader/lib/mtr" @@ -33,19 +34,6 @@ func setUpMetrics(cfg *config.Metrics) (mtr.Client, error) { return client, nil } -func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD mtr.Client) (*kafkalib.BatchWriter, error) { - if cfg == nil { - return nil, fmt.Errorf("kafka configuration is not set") - } - slog.Info("Kafka config", - slog.Bool("aws", cfg.AwsEnabled), - slog.String("kafkaBootstrapServer", cfg.BootstrapServers), - slog.Any("publishSize", cfg.GetPublishSize()), - slog.Uint64("maxRequestSize", cfg.MaxRequestSize), - ) - return kafkalib.NewBatchWriter(ctx, *cfg, statsD) -} - func buildSource(cfg *config.Settings) (sources.Source, error) { switch cfg.Source { case "", config.SourceDynamo: @@ -56,8 +44,28 @@ func buildSource(cfg *config.Settings) (sources.Source, error) { return mysql.Load(*cfg.MySQL) case config.SourcePostgreSQL: return postgres.Load(*cfg.PostgreSQL) + default: + panic(fmt.Sprintf("unknown source: %s", cfg.Source)) // should never happen + } +} + +func buildDestination(ctx context.Context, cfg *config.Settings, statsD mtr.Client) (destinations.DestinationWriter, error) { + switch cfg.Destination { + case config.DestinationKafka: + kafkaCfg := cfg.Kafka + if kafkaCfg == nil { + return nil, fmt.Errorf("kafka configuration is not set") + } + slog.Info("Kafka config", + slog.Bool("aws", kafkaCfg.AwsEnabled), + slog.String("kafkaBootstrapServer", kafkaCfg.BootstrapServers), + slog.Any("publishSize", kafkaCfg.GetPublishSize()), + slog.Uint64("maxRequestSize", kafkaCfg.MaxRequestSize), + ) + return kafkalib.NewBatchWriter(ctx, *kafkaCfg, statsD) + default: + panic(fmt.Sprintf("unknown destination: %s", cfg.Destination)) // should never happen } - panic(fmt.Sprintf("Unknown source: %s", cfg.Source)) // should never happen } func main() { @@ -80,18 +88,18 @@ func main() { logger.Fatal("Failed to set up metrics", slog.Any("err", err)) } - writer, err := setUpKafka(ctx, cfg.Kafka, statsD) + writer, err := buildDestination(ctx, cfg, statsD) if err != nil { - logger.Fatal("Failed to set up kafka", slog.Any("err", err)) + logger.Fatal(fmt.Sprintf("Failed to init '%s' destination", cfg.Destination), slog.Any("err", err)) } source, err := buildSource(cfg) if err != nil { - logger.Fatal(fmt.Sprintf("Failed to init %s", cfg.Source), slog.Any("err", err)) + logger.Fatal(fmt.Sprintf("Failed to init '%s' source", cfg.Source), slog.Any("err", err)) } defer source.Close() - err = source.Run(ctx, *writer) + err = source.Run(ctx, writer) if err != nil { logger.Fatal(fmt.Sprintf("Failed to run %s snapshot", cfg.Source), slog.Any("err", err)) } diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index ae424b17..84034056 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -9,19 +9,19 @@ import ( "github.com/artie-labs/transfer/lib/ptr" "github.com/aws/aws-sdk-go/service/dynamodbstreams" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/dynamo" - "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/logger" ) -func (s *StreamStore) ListenToChannel(ctx context.Context, writer kafkalib.BatchWriter) { +func (s *StreamStore) ListenToChannel(ctx context.Context, writer destinations.DestinationWriter) { for shard := range s.shardChan { go s.processShard(ctx, shard, writer) } } -func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer kafkalib.BatchWriter) { +func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer destinations.DestinationWriter) { var attempts int // Is there another go-routine processing this shard? diff --git a/sources/dynamodb/snapshot.go b/sources/dynamodb/snapshot.go index 1efa6d11..c4b9b24d 100644 --- a/sources/dynamodb/snapshot.go +++ b/sources/dynamodb/snapshot.go @@ -8,9 +8,9 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/dynamo" - "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/reader/lib/s3lib" ) @@ -28,7 +28,7 @@ func (s *SnapshotStore) Close() error { return nil } -func (s *SnapshotStore) Run(ctx context.Context, writer kafkalib.BatchWriter) error { +func (s *SnapshotStore) Run(ctx context.Context, writer destinations.DestinationWriter) error { if err := s.scanFilesOverBucket(); err != nil { return fmt.Errorf("scanning files over bucket failed: %w", err) } @@ -64,7 +64,7 @@ func (s *SnapshotStore) scanFilesOverBucket() error { return nil } -func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer kafkalib.BatchWriter) error { +func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer destinations.DestinationWriter) error { keys, err := s.retrievePrimaryKeys() if err != nil { return fmt.Errorf("failed to retrieve primary keys: %w", err) diff --git a/sources/dynamodb/stream.go b/sources/dynamodb/stream.go index 390f0ee7..181e2038 100644 --- a/sources/dynamodb/stream.go +++ b/sources/dynamodb/stream.go @@ -7,7 +7,7 @@ import ( "time" "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/lib/kafkalib" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/sources/dynamodb/offsets" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodbstreams" @@ -27,7 +27,7 @@ func (s *StreamStore) Close() error { return nil } -func (s *StreamStore) Run(ctx context.Context, writer kafkalib.BatchWriter) error { +func (s *StreamStore) Run(ctx context.Context, writer destinations.DestinationWriter) error { ticker := time.NewTicker(shardScannerInterval) // Start to subscribe to the channel diff --git a/sources/mongo/mongo.go b/sources/mongo/mongo.go index 22a4f5ad..250fe79a 100644 --- a/sources/mongo/mongo.go +++ b/sources/mongo/mongo.go @@ -12,7 +12,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/readpref" "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/lib/kafkalib" + "github.com/artie-labs/reader/destinations" ) type Source struct { @@ -49,7 +49,7 @@ func (s *Source) Close() error { return nil } -func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error { +func (s *Source) Run(ctx context.Context, writer destinations.DestinationWriter) error { for _, collection := range s.cfg.Collections { snapshotStartTime := time.Now() diff --git a/sources/mysql/snapshot.go b/sources/mysql/snapshot.go index 57d66f69..02a260eb 100644 --- a/sources/mysql/snapshot.go +++ b/sources/mysql/snapshot.go @@ -11,8 +11,8 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/lib/debezium/transformer" - "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/rdbms" "github.com/artie-labs/reader/sources/mysql/adapter" ) @@ -37,7 +37,7 @@ func (s Source) Close() error { return s.db.Close() } -func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error { +func (s *Source) Run(ctx context.Context, writer destinations.DestinationWriter) error { for _, tableCfg := range s.cfg.Tables { if err := s.snapshotTable(ctx, writer, *tableCfg); err != nil { return err @@ -46,7 +46,7 @@ func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error { return nil } -func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter, tableCfg config.MySQLTable) error { +func (s Source) snapshotTable(ctx context.Context, writer destinations.DestinationWriter, tableCfg config.MySQLTable) error { logger := slog.With(slog.String("table", tableCfg.Name), slog.String("database", s.cfg.Database)) snapshotStartTime := time.Now() diff --git a/sources/postgres/snapshot.go b/sources/postgres/snapshot.go index 24f5f6e2..33e9f704 100644 --- a/sources/postgres/snapshot.go +++ b/sources/postgres/snapshot.go @@ -11,8 +11,8 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/destinations" "github.com/artie-labs/reader/lib/debezium/transformer" - "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/rdbms" "github.com/artie-labs/reader/sources/postgres/adapter" ) @@ -38,7 +38,7 @@ func (s *Source) Close() error { return s.db.Close() } -func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error { +func (s *Source) Run(ctx context.Context, writer destinations.DestinationWriter) error { for _, tableCfg := range s.cfg.Tables { logger := slog.With(slog.String("schema", tableCfg.Schema), slog.String("table", tableCfg.Name)) snapshotStartTime := time.Now() diff --git a/sources/source.go b/sources/source.go index ce5f4b7f..49c7623e 100644 --- a/sources/source.go +++ b/sources/source.go @@ -3,10 +3,10 @@ package sources import ( "context" - "github.com/artie-labs/reader/lib/kafkalib" + "github.com/artie-labs/reader/destinations" ) type Source interface { Close() error - Run(ctx context.Context, writer kafkalib.BatchWriter) error + Run(ctx context.Context, writer destinations.DestinationWriter) error }