Skip to content

Commit

Permalink
Pass writer.Writer instead of destinations.Destination
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 29, 2024
1 parent a9b4d59 commit c2651fc
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 21 deletions.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/reader/lib/writer"
"github.com/artie-labs/reader/sources"
"github.com/artie-labs/reader/sources/dynamodb"
"github.com/artie-labs/reader/sources/mongo"
Expand Down Expand Up @@ -99,7 +100,7 @@ func main() {
}
defer source.Close()

if err = source.Run(ctx, destination); err != nil {
if err = source.Run(ctx, writer.New(destination)); err != nil {
logger.Fatal("Failed to run",
slog.Any("err", err),
slog.String("source", string(cfg.Source)),
Expand Down
5 changes: 2 additions & 3 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/logger"
Expand All @@ -29,12 +28,12 @@ func (s *SnapshotStore) Close() error {
return nil
}

func (s *SnapshotStore) Run(ctx context.Context, destination destinations.Destination) error {
func (s *SnapshotStore) Run(ctx context.Context, _writer writer.Writer) error {
if err := s.scanFilesOverBucket(); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
}

if err := s.streamAndPublish(ctx, writer.New(destination)); err != nil {
if err := s.streamAndPublish(ctx, _writer); err != nil {
return fmt.Errorf("stream and publish failed: %w", err)
}

Expand Down
5 changes: 2 additions & 3 deletions sources/dynamodb/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/writer"
"github.com/artie-labs/reader/sources/dynamodb/offsets"
"github.com/aws/aws-sdk-go/aws"
Expand All @@ -28,11 +27,11 @@ func (s *StreamStore) Close() error {
return nil
}

func (s *StreamStore) Run(ctx context.Context, destination destinations.Destination) error {
func (s *StreamStore) Run(ctx context.Context, _writer writer.Writer) error {
ticker := time.NewTicker(shardScannerInterval)

// Start to subscribe to the channel
go s.ListenToChannel(ctx, writer.New(destination))
go s.ListenToChannel(ctx, _writer)

// Scan it for the first time manually, so we don't have to wait 5 mins
if err := s.scanForNewShards(); err != nil {
Expand Down
5 changes: 1 addition & 4 deletions sources/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/writer"
)

Expand Down Expand Up @@ -50,9 +49,7 @@ func (s *Source) Close() error {
return nil
}

func (s *Source) Run(ctx context.Context, destination destinations.Destination) error {
_writer := writer.New(destination)

func (s *Source) Run(ctx context.Context, _writer writer.Writer) error {
for _, collection := range s.cfg.Collections {
snapshotStartTime := time.Now()

Expand Down
5 changes: 1 addition & 4 deletions sources/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
_ "github.com/go-sql-driver/mysql"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/lib/writer"
Expand All @@ -38,9 +37,7 @@ func (s Source) Close() error {
return s.db.Close()
}

func (s *Source) Run(ctx context.Context, destination destinations.Destination) error {
_writer := writer.New(destination)

func (s *Source) Run(ctx context.Context, _writer writer.Writer) error {
for _, tableCfg := range s.cfg.Tables {
if err := s.snapshotTable(ctx, _writer, *tableCfg); err != nil {
return err
Expand Down
5 changes: 1 addition & 4 deletions sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
_ "github.com/jackc/pgx/v5/stdlib"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/lib/writer"
Expand Down Expand Up @@ -39,9 +38,7 @@ func (s *Source) Close() error {
return s.db.Close()
}

func (s *Source) Run(ctx context.Context, destination destinations.Destination) error {
_writer := writer.New(destination)

func (s *Source) Run(ctx context.Context, _writer writer.Writer) error {
for _, tableCfg := range s.cfg.Tables {
logger := slog.With(slog.String("schema", tableCfg.Schema), slog.String("table", tableCfg.Name))
snapshotStartTime := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package sources
import (
"context"

"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib/writer"
)

type Source interface {
Close() error
Run(ctx context.Context, destination destinations.Destination) error
Run(ctx context.Context, _writer writer.Writer) error
}

0 comments on commit c2651fc

Please sign in to comment.