diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 1e365024..3df539ca 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -122,7 +122,8 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) } row := rows[0] - expectedPartitionKey := map[string]any{"payload": map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`}} + // This should not include the payload field in here. The payload field gets injected in [kafkalib.buildKafkaMessageWrapper] + expectedPartitionKey := map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`} expectedPkBytes, err := json.Marshal(expectedPartitionKey) if err != nil { return fmt.Errorf("failed to marshal expected partition key: %w", err) diff --git a/lib/mongo/message.go b/lib/mongo/message.go index a66dba30..bb1ebff3 100644 --- a/lib/mongo/message.go +++ b/lib/mongo/message.go @@ -36,13 +36,8 @@ func (m *Message) ToRawMessage(collection config.Collection, database string) (l Operation: m.operation, }, } - - pkMap := map[string]any{ - "payload": m.pkMap, - } - // MongoDB wouldn't include the schema. - return lib.NewRawMessage(collection.TopicSuffix(database), debezium.FieldsObject{}, pkMap, evt), nil + return lib.NewRawMessage(collection.TopicSuffix(database), debezium.FieldsObject{}, m.pkMap, evt), nil } func ParseMessage(after bson.M, before *bson.M, op string) (*Message, error) {