From 19fa197d54471702f3e6eb0b653646dd2c6c3680 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 28 Aug 2024 10:57:58 -0700 Subject: [PATCH] Upgrade deps (#479) --- Makefile | 6 ++++++ go.mod | 2 +- go.sum | 4 ++-- integration_tests/mongo/main.go | 4 ++-- lib/mongo/message_test.go | 6 +++--- writers/transfer/writer.go | 6 +++--- writers/transfer/writer_test.go | 8 ++------ writers/writer.go | 7 +++++-- 8 files changed, 24 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index d5599626..9c0d14b5 100644 --- a/Makefile +++ b/Makefile @@ -39,3 +39,9 @@ generate: go get github.com/maxbrunsfeld/counterfeiter/v6 go generate ./... go mod tidy + +.PHONY: upgrade +upgrade: + go get github.com/artie-labs/transfer + go mod tidy + echo "Upgrade complete" diff --git a/go.mod b/go.mod index 1ed1e8ee..ba7fb7d0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( github.com/DataDog/datadog-go/v5 v5.5.0 - github.com/artie-labs/transfer v1.26.8 + github.com/artie-labs/transfer v1.26.14 github.com/aws/aws-sdk-go-v2 v1.30.3 github.com/aws/aws-sdk-go-v2/config v1.27.27 github.com/aws/aws-sdk-go-v2/credentials v1.17.27 diff --git a/go.sum b/go.sum index 17dbc8d5..ec17f0c9 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo= github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= -github.com/artie-labs/transfer v1.26.8 h1:aNhd4f3KwHOl0NsCwS1c4SJfU+CGveleqQgMgCAZG/0= -github.com/artie-labs/transfer v1.26.8/go.mod h1:BlYxzzlXGHOMNSgbpcjzw1zQSD/wXmb93NoPBhOmcqA= +github.com/artie-labs/transfer v1.26.14 h1:UncLdk74bwt8kVY+xRks8kIosS3EjNMitdu13/xahwI= +github.com/artie-labs/transfer v1.26.14/go.mod h1:+a/UhlQVRIpdz3muS1yhSvyX42RQL0LHOdovGZfEsDE= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts= github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 8581705a..8ecb031e 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -169,12 +169,12 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) return fmt.Errorf("failed to get event from bytes: %w", err) } - pkMap, err := dbz.GetPrimaryKey(actualPkBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) + pkMap, err := dbz.GetPrimaryKey(actualPkBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) if err != nil { return fmt.Errorf("failed to get primary key: %w", err) } - data, err := evt.GetData(pkMap, &kafkalib.TopicConfig{}) + data, err := evt.GetData(pkMap, kafkalib.TopicConfig{}) if err != nil { return fmt.Errorf("failed to get data: %w", err) } diff --git a/lib/mongo/message_test.go b/lib/mongo/message_test.go index 78317dd5..b0f32f02 100644 --- a/lib/mongo/message_test.go +++ b/lib/mongo/message_test.go @@ -31,7 +31,7 @@ func TestParseMessagePartitionKey(t *testing.T) { assert.NoError(t, err) var dbz transferMongo.Debezium - pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) + pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) assert.NoError(t, err) assert.Equal(t, "507f1f77bcf86cd799439011", pkMap["_id"]) } @@ -88,7 +88,7 @@ func TestParseMessage(t *testing.T) { assert.NoError(t, err) var dbz transferMongo.Debezium - pkMap, err := dbz.GetPrimaryKey(rawPkBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) + pkMap, err := dbz.GetPrimaryKey(rawPkBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) assert.NoError(t, err) rawMsgBytes, err := json.Marshal(rawMsg.Event()) @@ -111,7 +111,7 @@ func TestParseMessage(t *testing.T) { "nullValue": nil, } - actualKVMap, err := kvMap.GetData(pkMap, &kafkalib.TopicConfig{}) + actualKVMap, err := kvMap.GetData(pkMap, kafkalib.TopicConfig{}) assert.NoError(t, err) for expectedKey, expectedVal := range expectedMap { actualVal, isOk := actualKVMap[expectedKey] diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index a7b26888..717df136 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -27,7 +27,7 @@ type Writer struct { cfg config.Config statsD mtr.Client inMemDB *models.DatabaseData - tc *kafkalib.TopicConfig + tc kafkalib.TopicConfig destination destination.Baseline primaryKeys []string @@ -46,7 +46,7 @@ func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) { cfg: cfg, statsD: statsD, inMemDB: models.NewMemoryDB(), - tc: cfg.Kafka.TopicConfigs[0], + tc: *cfg.Kafka.TopicConfigs[0], } if utils.IsOutputBaseline(cfg) { @@ -241,7 +241,7 @@ func (w *Writer) OnComplete() error { } slog.Info("Running dedupe...", slog.String("table", tableName)) - tableID := w.destination.IdentifierFor(*w.tc, tableName) + tableID := w.destination.IdentifierFor(w.tc, tableName) start := time.Now() dwh, isOk := w.destination.(destination.DataWarehouse) diff --git a/writers/transfer/writer_test.go b/writers/transfer/writer_test.go index 0618a2c1..7617d712 100644 --- a/writers/transfer/writer_test.go +++ b/writers/transfer/writer_test.go @@ -29,12 +29,8 @@ func TestWriter_MessageToEvent(t *testing.T) { assert.NoError(t, err) writer := Writer{ - cfg: transferCfg.Config{ - SharedTransferConfig: transferCfg.SharedTransferConfig{}, - }, - tc: &kafkalib.TopicConfig{ - CDCKeyFormat: kafkalib.JSONKeyFmt, - }, + cfg: transferCfg.Config{SharedTransferConfig: transferCfg.SharedTransferConfig{}}, + tc: kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}, } evtOut, err := writer.messageToEvent(message) diff --git a/writers/writer.go b/writers/writer.go index f740ca99..38013990 100644 --- a/writers/writer.go +++ b/writers/writer.go @@ -55,8 +55,11 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess } } - if err := w.destinationWriter.OnComplete(); err != nil { - return 0, fmt.Errorf("failed running destination OnComplete: %w", err) + // Only run [OnComplete] if we wrote messages out. Otherwise, primary keys may not be loaded. + if count > 0 { + if err := w.destinationWriter.OnComplete(); err != nil { + return 0, fmt.Errorf("failed running destination OnComplete: %w", err) + } } return count, nil