From 737623f5322be19fe21d22fb57bb11b29af4fea9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Dec 2024 14:33:07 -0800 Subject: [PATCH 1/3] Clean up. --- lib/mongo/message.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/mongo/message.go b/lib/mongo/message.go index a66dba30..bb1ebff3 100644 --- a/lib/mongo/message.go +++ b/lib/mongo/message.go @@ -36,13 +36,8 @@ func (m *Message) ToRawMessage(collection config.Collection, database string) (l Operation: m.operation, }, } - - pkMap := map[string]any{ - "payload": m.pkMap, - } - // MongoDB wouldn't include the schema. - return lib.NewRawMessage(collection.TopicSuffix(database), debezium.FieldsObject{}, pkMap, evt), nil + return lib.NewRawMessage(collection.TopicSuffix(database), debezium.FieldsObject{}, m.pkMap, evt), nil } func ParseMessage(after bson.M, before *bson.M, op string) (*Message, error) { From c631dffd1c4f4d4d4cd0ba0f1dbc55a1adb902e7 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Dec 2024 14:43:42 -0800 Subject: [PATCH 2/3] Fix integration tests --- integration_tests/mongo/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 1e365024..f49897c2 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -122,7 +122,7 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) } row := rows[0] - expectedPartitionKey := map[string]any{"payload": map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`}} + expectedPartitionKey := map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`} expectedPkBytes, err := json.Marshal(expectedPartitionKey) if err != nil { return fmt.Errorf("failed to marshal expected partition key: %w", err) From 1d461983c2e115d478cfa35c85c30d9c0dc4d2d9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Dec 2024 14:47:12 -0800 Subject: [PATCH 3/3] Update comment. --- integration_tests/mongo/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index f49897c2..3df539ca 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -122,6 +122,7 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) } row := rows[0] + // This should not include the payload field in here. The payload field gets injected in [kafkalib.buildKafkaMessageWrapper] expectedPartitionKey := map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`} expectedPkBytes, err := json.Marshal(expectedPartitionKey) if err != nil {