From a73ae55fcbe795e568cd8b0b832d04989b1119fc Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 11:57:01 -0800 Subject: [PATCH 1/9] WIP. --- lib/kafkalib/message.go | 26 ++++++++---- lib/kafkalib/message_test.go | 13 ++++++ lib/kafkalib/writer.go | 10 +---- sources/postgres/adapter/transformer_test.go | 43 ++++++++++++++++---- writers/transfer/writer.go | 2 +- writers/writer_test.go | 6 +-- 6 files changed, 70 insertions(+), 30 deletions(-) create mode 100644 lib/kafkalib/message_test.go diff --git a/lib/kafkalib/message.go b/lib/kafkalib/message.go index eff5c0a8a..695c93ca0 100644 --- a/lib/kafkalib/message.go +++ b/lib/kafkalib/message.go @@ -1,6 +1,7 @@ package kafkalib import ( + "fmt" "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/debezium" ) @@ -8,29 +9,36 @@ import ( type Message struct { topicSuffix string partitionKeySchema debezium.FieldsObject - partitionKey map[string]any + partitionKeyValues map[string]any event cdc.Event } -func NewMessage(topicSuffix string, partitionKeySchema debezium.FieldsObject, partitionKey map[string]any, event cdc.Event) Message { +func NewMessage(topicSuffix string, partitionKeySchema debezium.FieldsObject, partitionKeyValues map[string]any, event cdc.Event) Message { return Message{ topicSuffix: topicSuffix, partitionKeySchema: partitionKeySchema, - partitionKey: partitionKey, + partitionKeyValues: partitionKeyValues, event: event, } } -func (r Message) TopicSuffix() string { - return r.topicSuffix +func (r Message) Topic(prefix string) string { + if prefix == "" { + return r.topicSuffix + } + + return fmt.Sprintf("%s.%s", prefix, r.topicSuffix) } -func (r Message) PartitionKey() map[string]any { - return r.partitionKey +func (r Message) PartitionKey() debezium.PrimaryKeyPayload { + return debezium.PrimaryKeyPayload{ + Schema: r.partitionKeySchema, + Payload: r.partitionKeyValues, + } } -func (r Message) PartitionKeySchema() debezium.FieldsObject { - return r.partitionKeySchema +func (r Message) PartitionKeyValues() map[string]any { + return r.partitionKeyValues } func (r Message) Event() cdc.Event { diff --git a/lib/kafkalib/message_test.go b/lib/kafkalib/message_test.go new file mode 100644 index 000000000..088c2c0e1 --- /dev/null +++ b/lib/kafkalib/message_test.go @@ -0,0 +1,13 @@ +package kafkalib + +import ( + "github.com/artie-labs/transfer/lib/debezium" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMessagePartitionKey(t *testing.T) { + msg := NewMessage("suffix", debezium.FieldsObject{}, nil, nil) + assert.Equal(t, "suffix", msg.Topic(""), "no prefix") + assert.Equal(t, "prefix.suffix", msg.Topic("prefix"), "with prefix") +} diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index f96ac8b87..c1913fe93 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -9,7 +9,6 @@ import ( "time" "github.com/artie-labs/transfer/lib/batch" - "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/retry" "github.com/artie-labs/transfer/lib/typing/columns" @@ -83,18 +82,13 @@ func buildKafkaMessageWrapper(topicPrefix string, rawMessage Message) (KafkaMess return KafkaMessageWrapper{}, err } - pk := debezium.PrimaryKeyPayload{ - Schema: rawMessage.PartitionKeySchema(), - Payload: rawMessage.PartitionKey(), - } - - keyBytes, err := json.Marshal(pk) + keyBytes, err := json.Marshal(rawMessage.PartitionKey()) if err != nil { return KafkaMessageWrapper{}, err } return KafkaMessageWrapper{ - Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()), + Topic: rawMessage.Topic(topicPrefix), MessageKey: keyBytes, MessageValue: valueBytes, }, nil diff --git a/sources/postgres/adapter/transformer_test.go b/sources/postgres/adapter/transformer_test.go index f31ca40ad..a046a7662 100644 --- a/sources/postgres/adapter/transformer_test.go +++ b/sources/postgres/adapter/transformer_test.go @@ -2,9 +2,10 @@ package adapter import ( "fmt" - "github.com/artie-labs/transfer/lib/cdc/util" + "github.com/artie-labs/transfer/lib/debezium" "testing" + "github.com/artie-labs/transfer/lib/cdc/util" "github.com/stretchr/testify/assert" "github.com/artie-labs/reader/lib/debezium/converters" @@ -78,20 +79,44 @@ func TestDebeziumTransformer(t *testing.T) { msgs1 := results[0] assert.Len(t, msgs1, 2) - assert.Equal(t, "schema.table", msgs1[0].TopicSuffix()) - assert.Equal(t, map[string]any{"a": "1"}, msgs1[0].PartitionKey()) + assert.Equal(t, "schema.table", msgs1[0].Topic("")) + assert.Equal(t, + debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"a": "1"}, + }, + msgs1[0].PartitionKey(), + ) assert.Equal(t, map[string]any{"a": "1", "b": "11"}, msgs1[0].Event().(*util.SchemaEventPayload).Payload.After) - assert.Equal(t, "schema.table", msgs1[1].TopicSuffix()) - assert.Equal(t, map[string]any{"a": "2"}, msgs1[1].PartitionKey()) + assert.Equal(t, "schema.table", msgs1[1].Topic("")) + assert.Equal(t, + debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"a": "2"}, + }, + msgs1[1].PartitionKey(), + ) assert.Equal(t, map[string]any{"a": "2", "b": "12"}, msgs1[1].Event().(*util.SchemaEventPayload).Payload.After) msgs2 := results[1] assert.Len(t, msgs2, 2) - assert.Equal(t, "schema.table", msgs2[0].TopicSuffix()) - assert.Equal(t, map[string]any{"a": "3"}, msgs2[0].PartitionKey()) + assert.Equal(t, "schema.table", msgs2[0].Topic("")) + assert.Equal(t, + debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"a": "3"}, + }, + msgs2[0].PartitionKey(), + ) assert.Equal(t, map[string]any{"a": "3", "b": "13"}, msgs2[0].Event().(*util.SchemaEventPayload).Payload.After) - assert.Equal(t, "schema.table", msgs2[1].TopicSuffix()) - assert.Equal(t, map[string]any{"a": "4"}, msgs2[1].PartitionKey()) + assert.Equal(t, "schema.table", msgs2[1].Topic("")) + assert.Equal(t, + debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"a": "4"}, + }, + msgs2[1].PartitionKey(), + ) assert.Equal(t, map[string]any{"a": "4", "b": "14"}, msgs2[1].Event().(*util.SchemaEventPayload).Payload.After) } } diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index b7c201dff..43c4114e6 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -122,7 +122,7 @@ func (w *Writer) messageToEvent(message readerKafkaLib.Message) (event.Event, er return event.ToMemoryEvent(evt, partitionKey, w.tc, transferConfig.Replication) } - memoryEvent, err := event.ToMemoryEvent(evt, message.PartitionKey(), w.tc, transferConfig.Replication) + memoryEvent, err := event.ToMemoryEvent(evt, message.PartitionKeyValues(), w.tc, transferConfig.Replication) if err != nil { return event.Event{}, err } diff --git a/writers/writer_test.go b/writers/writer_test.go index 835267347..5bd05a77e 100644 --- a/writers/writer_test.go +++ b/writers/writer_test.go @@ -90,9 +90,9 @@ func TestWriter_Write(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 3, count) assert.Len(t, destination.messages, 3) - assert.Equal(t, destination.messages[0].TopicSuffix(), "a") - assert.Equal(t, destination.messages[1].TopicSuffix(), "b") - assert.Equal(t, destination.messages[2].TopicSuffix(), "c") + assert.Equal(t, destination.messages[0].Topic(""), "a") + assert.Equal(t, destination.messages[1].Topic(""), "b") + assert.Equal(t, destination.messages[2].Topic(""), "c") } { // Destination error From fbb352a126fe426ceb61a243d0daa2f9ba5ac3e2 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:19:03 -0800 Subject: [PATCH 2/9] Update. --- lib/debezium/transformer/transformer_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/debezium/transformer/transformer_test.go b/lib/debezium/transformer/transformer_test.go index e0b09dbe7..9f7d49bab 100644 --- a/lib/debezium/transformer/transformer_test.go +++ b/lib/debezium/transformer/transformer_test.go @@ -245,8 +245,14 @@ func TestDebeziumTransformer_Next(t *testing.T) { rows := results[0] assert.Len(t, rows, 1) rawMessage := rows[0] - assert.Equal(t, Row{"foo": "bar", "qux": 12}, rawMessage.PartitionKey()) - assert.Equal(t, "im-a-little-topic-suffix", rawMessage.TopicSuffix()) + assert.Equal(t, + debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: Row{"foo": "bar", "qux": 12}, + }, + rawMessage.PartitionKey(), + ) + assert.Equal(t, "im-a-little-topic-suffix", rawMessage.Topic("")) payload, isOk := rawMessage.Event().(*util.SchemaEventPayload) assert.True(t, isOk) payload.Payload.Source.TsMs = 12345 // Modify source time since it'll be ~now From 392a4285acddd8f3a07a9db7444d6c8889a007d1 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:29:19 -0800 Subject: [PATCH 3/9] Clean up. --- integration_tests/mssql/main.go | 15 ++++++++++++--- integration_tests/mysql/main.go | 27 +++++++++++++++++++++++---- integration_tests/postgres/main.go | 28 ++++++++++++++++++++++++---- integration_tests/utils/utils.go | 16 ++++++++++++++++ 4 files changed, 75 insertions(+), 11 deletions(-) diff --git a/integration_tests/mssql/main.go b/integration_tests/mssql/main.go index 6a652586e..42c24c7a5 100644 --- a/integration_tests/mssql/main.go +++ b/integration_tests/mssql/main.go @@ -6,8 +6,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/artie-labs/transfer/lib/debezium" "log/slog" - "maps" "os" "github.com/lmittmann/tint" @@ -199,8 +199,17 @@ func testTypes(db *sql.DB, dbName string) error { } row := rows[0] - expectedPartitionKey := map[string]any{"pk": int64(1)} - if !maps.Equal(row.PartitionKey(), expectedPartitionKey) { + expectedPartitionKey := debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"pk": int64(1)}, + } + + equal, err := utils.CheckPartitionKeyDifference(expectedPartitionKey, row.PartitionKey()) + if err != nil { + return fmt.Errorf("failed to check partition key difference: %w", err) + } + + if !equal { return fmt.Errorf("partition key %v does not match %v", row.PartitionKey(), expectedPartitionKey) } diff --git a/integration_tests/mysql/main.go b/integration_tests/mysql/main.go index 7ac269c6b..6447f082f 100644 --- a/integration_tests/mysql/main.go +++ b/integration_tests/mysql/main.go @@ -5,8 +5,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/artie-labs/transfer/lib/debezium" "log/slog" - "maps" "os" "github.com/lmittmann/tint" @@ -649,8 +649,17 @@ func testTypes(db *sql.DB, dbName string) error { } row := rows[0] - expectedPartitionKey := map[string]any{"pk": int64(1)} - if !maps.Equal(row.PartitionKey(), expectedPartitionKey) { + expectedPartitionKey := debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"pk": int64(1)}, + } + + equal, err := utils.CheckPartitionKeyDifference(expectedPartitionKey, row.PartitionKey()) + if err != nil { + return fmt.Errorf("failed to check partition key difference: %w", err) + } + + if !equal { return fmt.Errorf("partition key %v does not match %v", row.PartitionKey(), expectedPartitionKey) } @@ -781,7 +790,17 @@ func testScan(db *sql.DB, dbName string) error { return fmt.Errorf("expected %d rows, got %d, batch size %d", len(expectedPartitionKeys), len(rows), batchSize) } for i, row := range rows { - if !maps.Equal(row.PartitionKey(), expectedPartitionKeys[i]) { + expectedPartitionKey := debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: expectedPartitionKeys[i], + } + + equal, err := utils.CheckPartitionKeyDifference(expectedPartitionKey, row.PartitionKey()) + if err != nil { + return fmt.Errorf("failed to check partition key difference: %w", err) + } + + if !equal { return fmt.Errorf("partition keys are different for row %d, batch size %d, %v != %v", i, batchSize, row.PartitionKey(), expectedPartitionKeys[i]) } textValue := utils.GetEvent(row).Payload.After["c_text_value"] diff --git a/integration_tests/postgres/main.go b/integration_tests/postgres/main.go index 7e3889a42..8c615cf1d 100644 --- a/integration_tests/postgres/main.go +++ b/integration_tests/postgres/main.go @@ -6,8 +6,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/artie-labs/transfer/lib/debezium" "log/slog" - "maps" "os" _ "github.com/jackc/pgx/v5/stdlib" @@ -846,10 +846,19 @@ func testTypes(db *sql.DB) error { if len(rows) != 1 { return fmt.Errorf("expected one row, got %d", len(rows)) } + row := rows[0] + expectedPartitionKey := debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"pk": int64(1)}, + } + + equal, err := utils.CheckPartitionKeyDifference(expectedPartitionKey, row.PartitionKey()) + if err != nil { + return fmt.Errorf("failed to check partition key difference: %w", err) + } - expectedPartitionKey := map[string]any{"pk": int64(1)} - if !maps.Equal(row.PartitionKey(), expectedPartitionKey) { + if !equal { return fmt.Errorf("partition key %v does not match %v", row.PartitionKey(), expectedPartitionKey) } @@ -980,9 +989,20 @@ func testScan(db *sql.DB) error { return fmt.Errorf("expected %d rows, got %d, batch size %d", len(expectedPartitionKeys), len(rows), batchSize) } for i, row := range rows { - if !maps.Equal(row.PartitionKey(), expectedPartitionKeys[i]) { + expectedPartitionKey := debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: expectedPartitionKeys[i], + } + + equal, err := utils.CheckPartitionKeyDifference(expectedPartitionKey, row.PartitionKey()) + if err != nil { + return fmt.Errorf("failed to check partition key difference: %w", err) + } + + if !equal { return fmt.Errorf("partition keys are different for row %d, batch size %d, %v != %v", i, batchSize, row.PartitionKey(), expectedPartitionKeys[i]) } + textValue := utils.GetEvent(row).Payload.After["c_text_value"] if textValue != expectedValues[i] { return fmt.Errorf("row values are different for row %d, batch size %d, %v != %v", i, batchSize, textValue, expectedValues[i]) diff --git a/integration_tests/utils/utils.go b/integration_tests/utils/utils.go index e509888ea..636fee6ee 100644 --- a/integration_tests/utils/utils.go +++ b/integration_tests/utils/utils.go @@ -2,7 +2,9 @@ package utils import ( "database/sql" + "encoding/json" "fmt" + "github.com/artie-labs/transfer/lib/debezium" "log/slog" "math/rand/v2" "strings" @@ -87,3 +89,17 @@ func CheckDifference(name, expected, actual string) bool { fmt.Println("--------------------------------------------------------------------------------") return true } + +func CheckPartitionKeyDifference(expected, actual debezium.PrimaryKeyPayload) (bool, error) { + expectedBytes, err := json.Marshal(expected) + if err != nil { + return false, fmt.Errorf("failed to marshal expected: %w", err) + } + + actualBytes, err := json.Marshal(actual) + if err != nil { + return false, fmt.Errorf("failed to marshal actual: %w", err) + } + + return string(expectedBytes) == string(actualBytes), nil +} From ed91aa7e531ba06d72554c725a5ac463fc790b80 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:30:53 -0800 Subject: [PATCH 4/9] Imports. --- integration_tests/mssql/main.go | 2 +- integration_tests/mysql/main.go | 2 +- integration_tests/postgres/main.go | 2 +- integration_tests/utils/utils.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/mssql/main.go b/integration_tests/mssql/main.go index 42c24c7a5..d95d3d986 100644 --- a/integration_tests/mssql/main.go +++ b/integration_tests/mssql/main.go @@ -6,10 +6,10 @@ import ( "encoding/json" "errors" "fmt" - "github.com/artie-labs/transfer/lib/debezium" "log/slog" "os" + "github.com/artie-labs/transfer/lib/debezium" "github.com/lmittmann/tint" _ "github.com/microsoft/go-mssqldb" diff --git a/integration_tests/mysql/main.go b/integration_tests/mysql/main.go index 6447f082f..07c2105be 100644 --- a/integration_tests/mysql/main.go +++ b/integration_tests/mysql/main.go @@ -5,10 +5,10 @@ import ( "encoding/json" "errors" "fmt" - "github.com/artie-labs/transfer/lib/debezium" "log/slog" "os" + "github.com/artie-labs/transfer/lib/debezium" "github.com/lmittmann/tint" "github.com/artie-labs/reader/config" diff --git a/integration_tests/postgres/main.go b/integration_tests/postgres/main.go index 8c615cf1d..b94576de0 100644 --- a/integration_tests/postgres/main.go +++ b/integration_tests/postgres/main.go @@ -6,10 +6,10 @@ import ( "encoding/json" "errors" "fmt" - "github.com/artie-labs/transfer/lib/debezium" "log/slog" "os" + "github.com/artie-labs/transfer/lib/debezium" _ "github.com/jackc/pgx/v5/stdlib" "github.com/lmittmann/tint" diff --git a/integration_tests/utils/utils.go b/integration_tests/utils/utils.go index 636fee6ee..fd9072b9b 100644 --- a/integration_tests/utils/utils.go +++ b/integration_tests/utils/utils.go @@ -4,13 +4,13 @@ import ( "database/sql" "encoding/json" "fmt" - "github.com/artie-labs/transfer/lib/debezium" "log/slog" "math/rand/v2" "strings" "github.com/artie-labs/transfer/lib/cdc/mongo" "github.com/artie-labs/transfer/lib/cdc/util" + "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/reader/lib/debezium/transformer" "github.com/artie-labs/reader/lib/kafkalib" From ddd8b2935a323d270caadaac0dc7bcc0eaca5193 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:42:35 -0800 Subject: [PATCH 5/9] Fix test. --- integration_tests/mongo/main.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 1f2c948ce..25348d7d7 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/artie-labs/transfer/lib/debezium" "log/slog" "math/rand/v2" "os" @@ -123,20 +124,18 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) } row := rows[0] - // This should not include the payload field in here. The payload field gets injected in [kafkalib.buildKafkaMessageWrapper] - expectedPartitionKey := map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`} - expectedPkBytes, err := json.Marshal(expectedPartitionKey) - if err != nil { - return fmt.Errorf("failed to marshal expected partition key: %w", err) + expectedPartitionKey := debezium.PrimaryKeyPayload{ + Schema: debezium.FieldsObject{}, + Payload: map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`}, } - actualPkBytes, err := json.Marshal(row.PartitionKey()) + equal, err := utils.CheckPartitionKeyDifference(expectedPartitionKey, row.PartitionKey()) if err != nil { - return fmt.Errorf("failed to marshal actual partition key: %w", err) + return fmt.Errorf("failed to check partition key difference: %w", err) } - if string(expectedPkBytes) != string(actualPkBytes) { - return fmt.Errorf("partition key %s does not match %s", actualPkBytes, expectedPkBytes) + if !equal { + return fmt.Errorf("partition key %v does not match %v", row.PartitionKey(), expectedPartitionKey) } mongoEvt := utils.GetMongoEvent(row) From a35ccf557c7a8f3637e042246335fd2cf0a8bee9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:45:09 -0800 Subject: [PATCH 6/9] Update. --- integration_tests/mongo/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 25348d7d7..3a2ffb6e0 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -162,7 +162,12 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) return fmt.Errorf("failed to get event from bytes: %w", err) } - pkMap, err := dbz.GetPrimaryKey(actualPkBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) + actualPartitionKeyBytes, err := json.Marshal(row.PartitionKey()) + if err != nil { + return fmt.Errorf("failed to marshal partition key: %w", err) + } + + pkMap, err := dbz.GetPrimaryKey(actualPartitionKeyBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) if err != nil { return fmt.Errorf("failed to get primary key: %w", err) } From 4aa51e9a0f445342a2193f856f63bc075831f23c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:58:41 -0800 Subject: [PATCH 7/9] Imports. --- integration_tests/mongo/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 3a2ffb6e0..4f1377edd 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/artie-labs/transfer/lib/debezium" "log/slog" "math/rand/v2" "os" @@ -14,6 +13,7 @@ import ( mongoLib "github.com/artie-labs/reader/sources/mongo" xferMongo "github.com/artie-labs/transfer/lib/cdc/mongo" + "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" From 417e9930af276402bfab98b322896387daf9e35c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 12:59:38 -0800 Subject: [PATCH 8/9] Imports. --- sources/postgres/adapter/transformer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sources/postgres/adapter/transformer_test.go b/sources/postgres/adapter/transformer_test.go index a046a7662..b673879a8 100644 --- a/sources/postgres/adapter/transformer_test.go +++ b/sources/postgres/adapter/transformer_test.go @@ -2,10 +2,10 @@ package adapter import ( "fmt" - "github.com/artie-labs/transfer/lib/debezium" "testing" "github.com/artie-labs/transfer/lib/cdc/util" + "github.com/artie-labs/transfer/lib/debezium" "github.com/stretchr/testify/assert" "github.com/artie-labs/reader/lib/debezium/converters" From 48eed3cfded571dcb67cf775969951e75282d9ca Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 2 Jan 2025 17:08:31 -0800 Subject: [PATCH 9/9] PR Feedback --- integration_tests/utils/utils.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration_tests/utils/utils.go b/integration_tests/utils/utils.go index fd9072b9b..12f2cff40 100644 --- a/integration_tests/utils/utils.go +++ b/integration_tests/utils/utils.go @@ -1,6 +1,7 @@ package utils import ( + "bytes" "database/sql" "encoding/json" "fmt" @@ -101,5 +102,5 @@ func CheckPartitionKeyDifference(expected, actual debezium.PrimaryKeyPayload) (b return false, fmt.Errorf("failed to marshal actual: %w", err) } - return string(expectedBytes) == string(actualBytes), nil + return bytes.Equal(expectedBytes, actualBytes), nil }