Skip to content

Commit

Permalink
Add scaffolding to support multiple destinations
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 26, 2024
1 parent 232cbd1 commit 318f778
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 53 deletions.
46 changes: 34 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,41 +62,39 @@ type Metrics struct {
type Source string

const (
SourceInvalid Source = "invalid"
SourceDynamo Source = "dynamodb"
SourceMongoDB Source = "mongodb"
SourceMySQL Source = "mysql"
SourcePostgreSQL Source = "postgresql"
)

type Destination string

const (
DestinationKafka Destination = "kafka"
)

type Settings struct {
Source Source `yaml:"source"`
DynamoDB *DynamoDB `yaml:"dynamodb,omitempty"`
MongoDB *MongoDB `yaml:"mongodb,omitempty"`
MySQL *MySQL `yaml:"mysql,omitempty"`
PostgreSQL *PostgreSQL `yaml:"postgresql,omitempty"`

Destination Destination `yaml:"destination"`
Kafka *Kafka `yaml:"kafka"`

Reporting *Reporting `yaml:"reporting"`
Metrics *Metrics `yaml:"metrics"`
Kafka *Kafka `yaml:"kafka"`
}

func (s *Settings) Validate() error {
if s == nil {
return fmt.Errorf("config is nil")
}

if s.Kafka == nil {
return fmt.Errorf("kafka config is nil")
}

if err := s.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka validation failed: %w", err)
}

switch s.Source {
// By default, if you don't pass in a source -- it will be dynamodb for backwards compatibility
case SourceDynamo, "":
case SourceDynamo:
if s.DynamoDB == nil {
return fmt.Errorf("dynamodb config is nil")
}
Expand Down Expand Up @@ -128,7 +126,21 @@ func (s *Settings) Validate() error {
if err := s.PostgreSQL.Validate(); err != nil {
return fmt.Errorf("postgres validation failed: %w", err)
}
default:
return fmt.Errorf("invalid source: %s ", s.Source)
}

switch s.Destination {
case DestinationKafka:
if s.Kafka == nil {
return fmt.Errorf("kafka config is nil")
}

if err := s.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka validation failed: %w", err)
}
default:
return fmt.Errorf("invalid destination: %s ", s.Source)
}

return nil
Expand All @@ -146,6 +158,16 @@ func ReadConfig(fp string) (*Settings, error) {
return nil, fmt.Errorf("failed to unmarshal config file: %w", err)
}

if settings.Source == "" {
// By default, if you don't pass in a source -- it will be dynamodb for backwards compatibility
settings.Source = SourceDynamo
}

if settings.Destination == "" {
// By default, if you don't pass in a destination -- it will be Kafka for backwards compatibility
settings.Destination = DestinationKafka
}

if err = settings.Validate(); err != nil {
return nil, fmt.Errorf("failed to validate config file: %w", err)
}
Expand Down
17 changes: 17 additions & 0 deletions destinations/destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package destinations

import (
"context"

"github.com/artie-labs/reader/lib"
)

type RawMessageIterator interface {
HasNext() bool
Next() ([]lib.RawMessage, error)
}

type DestinationWriter interface {
WriteIterator(ctx context.Context, iter RawMessageIterator) (int, error)
WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMessage) error
}
8 changes: 2 additions & 6 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/segmentio/kafka-go"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
"github.com/artie-labs/reader/lib/mtr"
Expand Down Expand Up @@ -134,12 +135,7 @@ func (b *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
return nil
}

type messageIterator interface {
HasNext() bool
Next() ([]lib.RawMessage, error)
}

func (b *BatchWriter) WriteIterator(ctx context.Context, iter messageIterator) (int, error) {
func (b *BatchWriter) WriteIterator(ctx context.Context, iter destinations.RawMessageIterator) (int, error) {
start := time.Now()
var count int
for iter.HasNext() {
Expand Down
44 changes: 26 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/artie-labs/transfer/lib/telemetry/metrics"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/mtr"
Expand All @@ -33,19 +34,6 @@ func setUpMetrics(cfg *config.Metrics) (mtr.Client, error) {
return client, nil
}

func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD mtr.Client) (*kafkalib.BatchWriter, error) {
if cfg == nil {
return nil, fmt.Errorf("kafka configuration is not set")
}
slog.Info("Kafka config",
slog.Bool("aws", cfg.AwsEnabled),
slog.String("kafkaBootstrapServer", cfg.BootstrapServers),
slog.Any("publishSize", cfg.GetPublishSize()),
slog.Uint64("maxRequestSize", cfg.MaxRequestSize),
)
return kafkalib.NewBatchWriter(ctx, *cfg, statsD)
}

func buildSource(cfg *config.Settings) (sources.Source, error) {
switch cfg.Source {
case "", config.SourceDynamo:
Expand All @@ -56,8 +44,28 @@ func buildSource(cfg *config.Settings) (sources.Source, error) {
return mysql.Load(*cfg.MySQL)
case config.SourcePostgreSQL:
return postgres.Load(*cfg.PostgreSQL)
default:
panic(fmt.Sprintf("unknown source: %s", cfg.Source)) // should never happen
}
}

func buildDestination(ctx context.Context, cfg *config.Settings, statsD mtr.Client) (destinations.DestinationWriter, error) {
switch cfg.Destination {
case config.DestinationKafka:
kafkaCfg := cfg.Kafka
if kafkaCfg == nil {
return nil, fmt.Errorf("kafka configuration is not set")
}
slog.Info("Kafka config",
slog.Bool("aws", kafkaCfg.AwsEnabled),
slog.String("kafkaBootstrapServer", kafkaCfg.BootstrapServers),
slog.Any("publishSize", kafkaCfg.GetPublishSize()),
slog.Uint64("maxRequestSize", kafkaCfg.MaxRequestSize),
)
return kafkalib.NewBatchWriter(ctx, *kafkaCfg, statsD)
default:
panic(fmt.Sprintf("unknown destination: %s", cfg.Destination)) // should never happen
}
panic(fmt.Sprintf("Unknown source: %s", cfg.Source)) // should never happen
}

func main() {
Expand All @@ -80,18 +88,18 @@ func main() {
logger.Fatal("Failed to set up metrics", slog.Any("err", err))
}

writer, err := setUpKafka(ctx, cfg.Kafka, statsD)
writer, err := buildDestination(ctx, cfg, statsD)
if err != nil {
logger.Fatal("Failed to set up kafka", slog.Any("err", err))
logger.Fatal(fmt.Sprintf("Failed to init '%s' destination", cfg.Destination), slog.Any("err", err))
}

source, err := buildSource(cfg)
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to init %s", cfg.Source), slog.Any("err", err))
logger.Fatal(fmt.Sprintf("Failed to init '%s' source", cfg.Source), slog.Any("err", err))
}
defer source.Close()

err = source.Run(ctx, *writer)
err = source.Run(ctx, writer)
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to run %s snapshot", cfg.Source), slog.Any("err", err))
}
Expand Down
6 changes: 3 additions & 3 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"

"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
)

func (s *StreamStore) ListenToChannel(ctx context.Context, writer kafkalib.BatchWriter) {
func (s *StreamStore) ListenToChannel(ctx context.Context, writer destinations.DestinationWriter) {
for shard := range s.shardChan {
go s.processShard(ctx, shard, writer)
}
}

func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer kafkalib.BatchWriter) {
func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, writer destinations.DestinationWriter) {
var attempts int

// Is there another go-routine processing this shard?
Expand Down
6 changes: 3 additions & 3 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
)
Expand All @@ -28,7 +28,7 @@ func (s *SnapshotStore) Close() error {
return nil
}

func (s *SnapshotStore) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *SnapshotStore) Run(ctx context.Context, writer destinations.DestinationWriter) error {
if err := s.scanFilesOverBucket(); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
}
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *SnapshotStore) scanFilesOverBucket() error {
return nil
}

func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *SnapshotStore) streamAndPublish(ctx context.Context, writer destinations.DestinationWriter) error {
keys, err := s.retrievePrimaryKeys()
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
Expand All @@ -27,7 +27,7 @@ func (s *StreamStore) Close() error {
return nil
}

func (s *StreamStore) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *StreamStore) Run(ctx context.Context, writer destinations.DestinationWriter) error {
ticker := time.NewTicker(shardScannerInterval)

// Start to subscribe to the channel
Expand Down
4 changes: 2 additions & 2 deletions sources/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/destinations"
)

type Source struct {
Expand Down Expand Up @@ -49,7 +49,7 @@ func (s *Source) Close() error {
return nil
}

func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *Source) Run(ctx context.Context, writer destinations.DestinationWriter) error {
for _, collection := range s.cfg.Collections {
snapshotStartTime := time.Now()

Expand Down
6 changes: 3 additions & 3 deletions sources/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
_ "github.com/go-sql-driver/mysql"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/sources/mysql/adapter"
)
Expand All @@ -37,7 +37,7 @@ func (s Source) Close() error {
return s.db.Close()
}

func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *Source) Run(ctx context.Context, writer destinations.DestinationWriter) error {
for _, tableCfg := range s.cfg.Tables {
if err := s.snapshotTable(ctx, writer, *tableCfg); err != nil {
return err
Expand All @@ -46,7 +46,7 @@ func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
return nil
}

func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter, tableCfg config.MySQLTable) error {
func (s Source) snapshotTable(ctx context.Context, writer destinations.DestinationWriter, tableCfg config.MySQLTable) error {
logger := slog.With(slog.String("table", tableCfg.Name), slog.String("database", s.cfg.Database))
snapshotStartTime := time.Now()

Expand Down
4 changes: 2 additions & 2 deletions sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
_ "github.com/jackc/pgx/v5/stdlib"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/sources/postgres/adapter"
)
Expand All @@ -38,7 +38,7 @@ func (s *Source) Close() error {
return s.db.Close()
}

func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
func (s *Source) Run(ctx context.Context, writer destinations.DestinationWriter) error {
for _, tableCfg := range s.cfg.Tables {
logger := slog.With(slog.String("schema", tableCfg.Schema), slog.String("table", tableCfg.Name))
snapshotStartTime := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package sources
import (
"context"

"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/destinations"
)

type Source interface {
Close() error
Run(ctx context.Context, writer kafkalib.BatchWriter) error
Run(ctx context.Context, writer destinations.DestinationWriter) error
}

0 comments on commit 318f778

Please sign in to comment.