Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Transfer + using new dedupe #369

Merged
merged 4 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,3 @@ go.work

# GoReleaser
dist/

6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ release:
.PHONY: clean
clean:
go clean -testcache

.PHONY: generate
generate:
go get github.com/maxbrunsfeld/counterfeiter/v6
go generate ./...
go mod tidy
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/DataDog/datadog-go v4.8.3+incompatible
github.com/artie-labs/transfer v1.24.2
github.com/artie-labs/transfer v1.24.4
github.com/aws/aws-sdk-go v1.44.327
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.19
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/artie-labs/transfer v1.24.2 h1:FbGxHbx7hEwxMN/X22QJztnrGr0W6plrEB7leb7e2Eo=
github.com/artie-labs/transfer v1.24.2/go.mod h1:mlDGYVa9CH93Rrcsh2bxrZW6HxSG65lnUjSOot8+oIc=
github.com/artie-labs/transfer v1.24.4 h1:JvHDV4g+MduJyeKHl7TGf6JzsoFejgktp1u2PqUoKIQ=
github.com/artie-labs/transfer v1.24.4/go.mod h1:mlDGYVa9CH93Rrcsh2bxrZW6HxSG65lnUjSOot8+oIc=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY=
github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
Expand Down
208 changes: 208 additions & 0 deletions lib/mocks/client.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions lib/mocks/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package mocks

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
//counterfeiter:generate -o=client.mock.go ../mtr Client
26 changes: 21 additions & 5 deletions writers/transfer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Writer struct {
inMemDB *models.DatabaseData
tc *kafkalib.TopicConfig
destination destination.DataWarehouse

primaryKeys []string
}

func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) {
Expand All @@ -38,7 +40,7 @@ func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) {
return nil, fmt.Errorf("kafka config should have exactly one topic config")
}

destination, err := utils.LoadDataWarehouse(cfg, nil)
_destination, err := utils.LoadDataWarehouse(cfg, nil)
if err != nil {
return nil, err
}
Expand All @@ -48,7 +50,7 @@ func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) {
statsD: statsD,
inMemDB: models.NewMemoryDB(),
tc: cfg.Kafka.TopicConfigs[0],
destination: destination,
destination: _destination,
}, nil
}

Expand Down Expand Up @@ -98,13 +100,23 @@ func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error {
}()

for _, evt := range events {
// Set the primary keys if it's not set already.
if len(w.primaryKeys) == 0 {
var pks []string
for key := range evt.PrimaryKeyMap {
pks = append(pks, key)
}

w.primaryKeys = pks
}

shouldFlush, flushReason, err := evt.Save(w.cfg, w.inMemDB, w.tc, artie.Message{})
if err != nil {
return fmt.Errorf("failed to save event: %w", err)
}

if shouldFlush {
if err := w.flush(flushReason); err != nil {
if err = w.flush(flushReason); err != nil {
return err
}
}
Expand Down Expand Up @@ -154,7 +166,7 @@ func (w *Writer) flush(reason string) error {
}

tableData.ResetTempTableSuffix()
if err := w.destination.Append(tableData.TableData); err != nil {
if err = w.destination.Append(tableData.TableData); err != nil {
tags["what"] = "merge_fail"
tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err))
return fmt.Errorf("failed to append data to destination: %w", err)
Expand All @@ -164,6 +176,10 @@ func (w *Writer) flush(reason string) error {
}

func (w *Writer) OnComplete() error {
if len(w.primaryKeys) == 0 {
return fmt.Errorf("primary keys not set")
}

if err := w.flush("complete"); err != nil {
return err
}
Expand All @@ -176,7 +192,7 @@ func (w *Writer) OnComplete() error {
slog.Info("Running dedupe...", slog.String("table", tableName))
tableID := w.destination.IdentifierFor(*w.tc, tableName)
start := time.Now()
if err = w.destination.Dedupe(tableID); err != nil {
if err = w.destination.Dedupe(tableID, w.primaryKeys, *w.tc); err != nil {
return err
}
slog.Info("Dedupe complete", slog.String("table", tableName), slog.Duration("duration", time.Since(start)))
Expand Down
42 changes: 42 additions & 0 deletions writers/transfer/writer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package transfer

import (
"context"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/mocks"
"github.com/artie-labs/transfer/lib/cdc/util"
"testing"

transferCfg "github.com/artie-labs/transfer/lib/config"
Expand Down Expand Up @@ -45,3 +49,41 @@ func TestWriter_MessageToEvent(t *testing.T) {
"string": "Hello, world!",
}, evtOut.Data)
}

func TestWriter_Write(t *testing.T) {
var rawMsgs []lib.RawMessage
for range 100 {
rawMsgs = append(rawMsgs, lib.NewRawMessage(
"topic-suffix",
map[string]any{"key": "value"},
&util.SchemaEventPayload{
Payload: util.Payload{
After: map[string]any{"a": "b"},
Source: util.Source{
TsMs: 1000,
Table: "table",
},
Operation: "c",
},
},
))
}

writer, err := NewWriter(transferCfg.Config{
Mode: transferCfg.Replication,
Output: "test",
Kafka: &transferCfg.Kafka{
TopicConfigs: []*kafkalib.TopicConfig{
{
TableName: "table",
},
},
},
}, &mocks.FakeClient{})
assert.NoError(t, err)

assert.Nil(t, writer.primaryKeys)
assert.NoError(t, writer.Write(context.Background(), rawMsgs))
assert.Len(t, writer.primaryKeys, 1)
assert.Equal(t, "key", writer.primaryKeys[0])
}