Skip to content

Commit

Permalink
Upgrade deps (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 28, 2024
1 parent 470b6f2 commit 19fa197
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 19 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ generate:
go get github.com/maxbrunsfeld/counterfeiter/v6
go generate ./...
go mod tidy

.PHONY: upgrade
upgrade:
go get github.com/artie-labs/transfer
go mod tidy
echo "Upgrade complete"
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.26.8
github.com/artie-labs/transfer v1.26.14
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.26.8 h1:aNhd4f3KwHOl0NsCwS1c4SJfU+CGveleqQgMgCAZG/0=
github.com/artie-labs/transfer v1.26.8/go.mod h1:BlYxzzlXGHOMNSgbpcjzw1zQSD/wXmb93NoPBhOmcqA=
github.com/artie-labs/transfer v1.26.14 h1:UncLdk74bwt8kVY+xRks8kIosS3EjNMitdu13/xahwI=
github.com/artie-labs/transfer v1.26.14/go.mod h1:+a/UhlQVRIpdz3muS1yhSvyX42RQL0LHOdovGZfEsDE=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/mongo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB)
return fmt.Errorf("failed to get event from bytes: %w", err)
}

pkMap, err := dbz.GetPrimaryKey(actualPkBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
pkMap, err := dbz.GetPrimaryKey(actualPkBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
if err != nil {
return fmt.Errorf("failed to get primary key: %w", err)
}

data, err := evt.GetData(pkMap, &kafkalib.TopicConfig{})
data, err := evt.GetData(pkMap, kafkalib.TopicConfig{})
if err != nil {
return fmt.Errorf("failed to get data: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/mongo/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestParseMessagePartitionKey(t *testing.T) {
assert.NoError(t, err)

var dbz transferMongo.Debezium
pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
assert.NoError(t, err)
assert.Equal(t, "507f1f77bcf86cd799439011", pkMap["_id"])
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestParseMessage(t *testing.T) {
assert.NoError(t, err)

var dbz transferMongo.Debezium
pkMap, err := dbz.GetPrimaryKey(rawPkBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
pkMap, err := dbz.GetPrimaryKey(rawPkBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
assert.NoError(t, err)

rawMsgBytes, err := json.Marshal(rawMsg.Event())
Expand All @@ -111,7 +111,7 @@ func TestParseMessage(t *testing.T) {
"nullValue": nil,
}

actualKVMap, err := kvMap.GetData(pkMap, &kafkalib.TopicConfig{})
actualKVMap, err := kvMap.GetData(pkMap, kafkalib.TopicConfig{})
assert.NoError(t, err)
for expectedKey, expectedVal := range expectedMap {
actualVal, isOk := actualKVMap[expectedKey]
Expand Down
6 changes: 3 additions & 3 deletions writers/transfer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Writer struct {
cfg config.Config
statsD mtr.Client
inMemDB *models.DatabaseData
tc *kafkalib.TopicConfig
tc kafkalib.TopicConfig
destination destination.Baseline

primaryKeys []string
Expand All @@ -46,7 +46,7 @@ func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) {
cfg: cfg,
statsD: statsD,
inMemDB: models.NewMemoryDB(),
tc: cfg.Kafka.TopicConfigs[0],
tc: *cfg.Kafka.TopicConfigs[0],
}

if utils.IsOutputBaseline(cfg) {
Expand Down Expand Up @@ -241,7 +241,7 @@ func (w *Writer) OnComplete() error {
}

slog.Info("Running dedupe...", slog.String("table", tableName))
tableID := w.destination.IdentifierFor(*w.tc, tableName)
tableID := w.destination.IdentifierFor(w.tc, tableName)
start := time.Now()

dwh, isOk := w.destination.(destination.DataWarehouse)
Expand Down
8 changes: 2 additions & 6 deletions writers/transfer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ func TestWriter_MessageToEvent(t *testing.T) {
assert.NoError(t, err)

writer := Writer{
cfg: transferCfg.Config{
SharedTransferConfig: transferCfg.SharedTransferConfig{},
},
tc: &kafkalib.TopicConfig{
CDCKeyFormat: kafkalib.JSONKeyFmt,
},
cfg: transferCfg.Config{SharedTransferConfig: transferCfg.SharedTransferConfig{}},
tc: kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt},
}

evtOut, err := writer.messageToEvent(message)
Expand Down
7 changes: 5 additions & 2 deletions writers/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess
}
}

if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
// Only run [OnComplete] if we wrote messages out. Otherwise, primary keys may not be loaded.
if count > 0 {
if err := w.destinationWriter.OnComplete(); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}
}

return count, nil
Expand Down

0 comments on commit 19fa197

Please sign in to comment.