Skip to content

Commit

Permalink
[dynamodb] Use batch iterator for writing (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Mar 29, 2024
1 parent b6647c8 commit a9b4d59
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 83 deletions.
42 changes: 15 additions & 27 deletions lib/debezium/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/artie-labs/transfer/lib/debezium"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/debezium/converters"
)

Expand Down Expand Up @@ -57,33 +58,20 @@ func (m mockAdatper) NewIterator() (RowsIterator, error) {
return m.iter, nil
}

type mockIterator struct {
returnErr bool
index int
batches [][]Row
}
type errorIterator struct{}

func (m *mockIterator) HasNext() bool {
return m.index < len(m.batches)
func (m *errorIterator) HasNext() bool {
return true
}

func (m *mockIterator) Next() ([]Row, error) {
if m.returnErr {
return nil, fmt.Errorf("test iteration error")
}

if !m.HasNext() {
return nil, fmt.Errorf("done")
}
result := m.batches[m.index]
m.index++
return result, nil
func (m *errorIterator) Next() ([]Row, error) {
return nil, fmt.Errorf("test iteration error")
}

func TestDebeziumTransformer_Iteration(t *testing.T) {
{
// Empty iterator
transformer, err := NewDebeziumTransformer(mockAdatper{iter: &mockIterator{}})
transformer, err := NewDebeziumTransformer(mockAdatper{iter: lib.NewBatchIterator([][]Row{})})
assert.NoError(t, err)
assert.False(t, transformer.HasNext())
rows, err := transformer.Next()
Expand All @@ -93,7 +81,7 @@ func TestDebeziumTransformer_Iteration(t *testing.T) {
{
// One empty batch
batches := [][]Row{{}}
transformer, err := NewDebeziumTransformer(mockAdatper{iter: &mockIterator{batches: batches}})
transformer, err := NewDebeziumTransformer(mockAdatper{iter: lib.NewBatchIterator(batches)})
assert.NoError(t, err)
assert.True(t, transformer.HasNext())
rows, err := transformer.Next()
Expand All @@ -116,7 +104,7 @@ func TestDebeziumTransformer_Iteration(t *testing.T) {
}}
transformer, err := NewDebeziumTransformer(mockAdatper{
fieldConverters: fieldConverters,
iter: &mockIterator{batches: batches},
iter: lib.NewBatchIterator(batches),
})
assert.NoError(t, err)
// First batch
Expand Down Expand Up @@ -153,7 +141,7 @@ func TestDebeziumTransformer_Iteration(t *testing.T) {
}
transformer, err := NewDebeziumTransformer(mockAdatper{
fieldConverters: fieldConverters,
iter: &mockIterator{batches: batches},
iter: lib.NewBatchIterator(batches),
})
assert.NoError(t, err)
// First batch
Expand Down Expand Up @@ -192,7 +180,7 @@ func TestDebeziumTransformer_Next(t *testing.T) {
mockAdatper{
fieldConverters: fieldConverters,
partitionKeys: []string{"foo"},
iter: &mockIterator{batches: [][]Row{{{"foo": "bar"}}}, returnErr: true},
iter: &errorIterator{},
},
)
assert.NoError(t, err)
Expand All @@ -208,7 +196,7 @@ func TestDebeziumTransformer_Next(t *testing.T) {
transformer, err := NewDebeziumTransformer(mockAdatper{
fieldConverters: fieldConverters,
partitionKeys: []string{"foo"},
iter: &mockIterator{batches: [][]Row{{{"foo": "bar"}}}},
iter: lib.NewSingleBatchIterator([]Row{{"foo": "bar"}}),
},
)
assert.NoError(t, err)
Expand All @@ -229,7 +217,7 @@ func TestDebeziumTransformer_Next(t *testing.T) {
transformer, err := NewDebeziumTransformer(mockAdatper{
fieldConverters: fieldConverters,
partitionKeys: []string{"foo", "qux"},
iter: &mockIterator{batches: batches},
iter: lib.NewBatchIterator(batches),
},
)
assert.NoError(t, err)
Expand Down Expand Up @@ -277,7 +265,7 @@ func TestDebeziumTransformer_CreatePayload(t *testing.T) {
fieldConverters := []FieldConverter{
{Name: "qux", ValueConverter: testConverter{intField: true, returnErr: true}},
}
transformer, err := NewDebeziumTransformer(mockAdatper{fieldConverters: fieldConverters, iter: &mockIterator{}})
transformer, err := NewDebeziumTransformer(mockAdatper{fieldConverters: fieldConverters, iter: lib.NewBatchIterator([][]Row{})})
assert.NoError(t, err)
_, err = transformer.createPayload(Row{"qux": "quux"})
assert.ErrorContains(t, err, `failed to convert row value for key "qux": test error`)
Expand All @@ -288,7 +276,7 @@ func TestDebeziumTransformer_CreatePayload(t *testing.T) {
{Name: "foo", ValueConverter: testConverter{intField: false}},
{Name: "qux", ValueConverter: testConverter{intField: true}},
}
transformer, err := NewDebeziumTransformer(mockAdatper{fieldConverters: fieldConverters, iter: &mockIterator{}})
transformer, err := NewDebeziumTransformer(mockAdatper{fieldConverters: fieldConverters, iter: lib.NewBatchIterator([][]Row{})})
assert.NoError(t, err)
payload, err := transformer.createPayload(Row{"foo": "bar", "qux": "quux"})
assert.NoError(t, err)
Expand Down
30 changes: 30 additions & 0 deletions lib/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package lib

import (
"fmt"

"github.com/artie-labs/transfer/lib/cdc/mongo"
"github.com/artie-labs/transfer/lib/cdc/util"
)
Expand Down Expand Up @@ -38,3 +40,31 @@ func (r RawMessage) GetPayload() any {

return r.payload
}

type batchIterator[T any] struct {
index int
batches [][]T
}

// Returns an iterator that produces multiple batches.
func NewBatchIterator[T any](batches [][]T) *batchIterator[T] {
return &batchIterator[T]{batches: batches}
}

// Returns an iterator that produces a single batch.
func NewSingleBatchIterator[T any](batches []T) *batchIterator[T] {
return NewBatchIterator([][]T{batches})
}

func (bi *batchIterator[T]) HasNext() bool {
return bi.index < len(bi.batches)
}

func (bi *batchIterator[T]) Next() ([]T, error) {
if !bi.HasNext() {
return nil, fmt.Errorf("iterator has finished")
}
result := bi.batches[bi.index]
bi.index++
return result, nil
}
81 changes: 81 additions & 0 deletions lib/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package lib

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_BatchIterator(t *testing.T) {
{
// No batches
iter := NewBatchIterator([][]string{})
assert.False(t, iter.HasNext())
_, err := iter.Next()
assert.ErrorContains(t, err, "iterator has finished")
}
{
// One empty batch
iter := NewBatchIterator([][]string{{}})
assert.True(t, iter.HasNext())
batch, err := iter.Next()
assert.NoError(t, err)
assert.Empty(t, batch)
assert.False(t, iter.HasNext())
_, err = iter.Next()
assert.ErrorContains(t, err, "iterator has finished")
}
{
// Two non-empty batches one empty batch
iter := NewBatchIterator([][]string{{"a", "b"}, {}, {"c", "d"}})
assert.True(t, iter.HasNext())
{
batch, err := iter.Next()
assert.NoError(t, err)
assert.Equal(t, []string{"a", "b"}, batch)
}

assert.True(t, iter.HasNext())
{
batch, err := iter.Next()
assert.NoError(t, err)
assert.Empty(t, batch)
}

assert.True(t, iter.HasNext())
{
batch, err := iter.Next()
assert.NoError(t, err)
assert.Equal(t, []string{"c", "d"}, batch)
}

assert.False(t, iter.HasNext())
_, err := iter.Next()
assert.ErrorContains(t, err, "iterator has finished")
}
}

func Test_SingleBatchIterator(t *testing.T) {
{
// Empty batch
iter := NewSingleBatchIterator([]string{})
assert.True(t, iter.HasNext())
batch, err := iter.Next()
assert.NoError(t, err)
assert.Empty(t, batch)
assert.False(t, iter.HasNext())
_, err = iter.Next()
assert.ErrorContains(t, err, "iterator has finished")
}
{
// Non-empty batch
iter := NewSingleBatchIterator([]string{"a", "b", "c", "d"})
assert.True(t, iter.HasNext())
batch, err := iter.Next()
assert.NoError(t, err)
assert.Equal(t, []string{"a", "b", "c", "d"}, batch)
assert.False(t, iter.HasNext())
_, err = iter.Next()
assert.ErrorContains(t, err, "iterator has finished")
}
}
33 changes: 10 additions & 23 deletions lib/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,22 @@ func (m *mockDestination) WriteRawMessages(ctx context.Context, msgs []lib.RawMe
return nil
}

type mockIterator struct {
emitError bool
index int
batches [][]lib.RawMessage
}
type errorIterator struct{}

func (m *mockIterator) HasNext() bool {
return m.index < len(m.batches)
func (m *errorIterator) HasNext() bool {
return true
}

func (m *mockIterator) Next() ([]lib.RawMessage, error) {
if m.emitError {
return nil, fmt.Errorf("test iteration error")
}

if !m.HasNext() {
return nil, fmt.Errorf("done")
}
result := m.batches[m.index]
m.index++
return result, nil
func (m *errorIterator) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("test iteration error")
}

func TestWriter_Write(t *testing.T) {
{
// Empty iterator
destination := &mockDestination{}
writer := New(destination)
iterator := &mockIterator{}
iterator := lib.NewBatchIterator([][]lib.RawMessage{})
count, err := writer.Write(context.Background(), iterator)
assert.NoError(t, err)
assert.Equal(t, 0, count)
Expand All @@ -60,7 +47,7 @@ func TestWriter_Write(t *testing.T) {
// Iteration error
destination := &mockDestination{}
writer := New(destination)
iterator := &mockIterator{emitError: true, batches: [][]lib.RawMessage{{{TopicSuffix: "a"}}}}
iterator := &errorIterator{}
_, err := writer.Write(context.Background(), iterator)
assert.ErrorContains(t, err, "failed to iterate over messages: test iteration error")
assert.Empty(t, destination.messages)
Expand All @@ -69,7 +56,7 @@ func TestWriter_Write(t *testing.T) {
// Two empty batches
destination := &mockDestination{}
writer := New(destination)
iterator := &mockIterator{batches: [][]lib.RawMessage{{}, {}}}
iterator := lib.NewBatchIterator([][]lib.RawMessage{{}, {}})
count, err := writer.Write(context.Background(), iterator)
assert.NoError(t, err)
assert.Equal(t, 0, count)
Expand All @@ -79,7 +66,7 @@ func TestWriter_Write(t *testing.T) {
// Three batches, two non-empty
destination := &mockDestination{}
writer := New(destination)
iterator := &mockIterator{batches: [][]lib.RawMessage{{{TopicSuffix: "a"}}, {}, {{TopicSuffix: "b"}, {TopicSuffix: "c"}}}}
iterator := lib.NewBatchIterator([][]lib.RawMessage{{{TopicSuffix: "a"}}, {}, {{TopicSuffix: "b"}, {TopicSuffix: "c"}}})
count, err := writer.Write(context.Background(), iterator)
assert.NoError(t, err)
assert.Equal(t, 3, count)
Expand All @@ -92,7 +79,7 @@ func TestWriter_Write(t *testing.T) {
// Destination error
destination := &mockDestination{emitError: true}
writer := New(destination)
iterator := &mockIterator{batches: [][]lib.RawMessage{{{TopicSuffix: "a"}}}}
iterator := lib.NewSingleBatchIterator([]lib.RawMessage{{TopicSuffix: "a"}})
_, err := writer.Write(context.Background(), iterator)
assert.ErrorContains(t, err, "failed to write messages: test write-raw-messages error")
assert.Empty(t, destination.messages)
Expand Down
11 changes: 6 additions & 5 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"

"github.com/artie-labs/reader/destinations"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/writer"
)

func (s *StreamStore) ListenToChannel(ctx context.Context, destination destinations.Destination) {
func (s *StreamStore) ListenToChannel(ctx context.Context, _writer writer.Writer) {
for shard := range s.shardChan {
go s.processShard(ctx, shard, destination)
go s.processShard(ctx, shard, _writer)
}
}

func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, destination destinations.Destination) {
func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.Shard, _writer writer.Writer) {
var attempts int

// Is there another go-routine processing this shard?
Expand Down Expand Up @@ -97,7 +97,8 @@ func (s *StreamStore) processShard(ctx context.Context, shard *dynamodbstreams.S
messages = append(messages, msg.RawMessage())
}

if err = destination.WriteRawMessages(ctx, messages); err != nil {
// TODO: Create an actual iterator over the shards that is passed to the writer.
if _, err = _writer.Write(ctx, lib.NewSingleBatchIterator(messages)); err != nil {
logger.Panic("Failed to publish messages, exiting...", slog.Any("err", err))
}

Expand Down
8 changes: 5 additions & 3 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/artie-labs/reader/lib/writer"
)

type SnapshotStore struct {
Expand All @@ -33,7 +34,7 @@ func (s *SnapshotStore) Run(ctx context.Context, destination destinations.Destin
return fmt.Errorf("scanning files over bucket failed: %w", err)
}

if err := s.streamAndPublish(ctx, destination); err != nil {
if err := s.streamAndPublish(ctx, writer.New(destination)); err != nil {
return fmt.Errorf("stream and publish failed: %w", err)
}

Expand Down Expand Up @@ -64,7 +65,7 @@ func (s *SnapshotStore) scanFilesOverBucket() error {
return nil
}

func (s *SnapshotStore) streamAndPublish(ctx context.Context, destination destinations.Destination) error {
func (s *SnapshotStore) streamAndPublish(ctx context.Context, _writer writer.Writer) error {
keys, err := s.retrievePrimaryKeys()
if err != nil {
return fmt.Errorf("failed to retrieve primary keys: %w", err)
Expand Down Expand Up @@ -92,7 +93,8 @@ func (s *SnapshotStore) streamAndPublish(ctx context.Context, destination destin
messages = append(messages, dynamoMsg.RawMessage())
}

if err = destination.WriteRawMessages(ctx, messages); err != nil {
// TODO: Create an actual iterator over the files that is passed to the writer.
if _, err := _writer.Write(ctx, lib.NewSingleBatchIterator(messages)); err != nil {
return fmt.Errorf("failed to publish messages: %w", err)
}

Expand Down
Loading

0 comments on commit a9b4d59

Please sign in to comment.