Skip to content

Commit

Permalink
Fix MongoDB bug (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 16, 2024
1 parent 3c03abf commit 8a44105
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
8 changes: 6 additions & 2 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) {

func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConfig) (map[string]any, error) {
var retMap map[string]any
if len(s.Payload.afterMap) == 0 {

switch s.Operation() {
case "d":
// This is a delete event, so mark it as deleted.
// And we need to reconstruct the data bit since it will be empty.
// We _can_ rely on *before* since even without running replicate identity, it will still copy over
Expand All @@ -171,7 +173,7 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf
for k, v := range pkMap {
retMap[k] = v
}
} else {
case "r", "u", "c":
retMap = s.Payload.afterMap
// TODO: Remove this code.
for key, value := range pkMap {
Expand All @@ -185,6 +187,8 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf

retMap[constants.DeleteColumnMarker] = false
retMap[constants.OnlySetDeleteColumnMarker] = false
default:
return nil, fmt.Errorf("unknown operation: %q", s.Operation())
}

if tc.IncludeArtieUpdatedAt {
Expand Down
10 changes: 9 additions & 1 deletion lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ func (m *MongoTestSuite) TestMongoDBEventOrder() {
assert.False(m.T(), evt.DeletePayload())
}

func (m *MongoTestSuite) TestMongoDBEvent_DeletedRow() {
payload := `{"schema":{"type":"","fields":null},"payload":{"before":"{\"_id\":\"abc\"}","after":"{\"_id\":\"abc\"}","source":{"connector":"","ts_ms":1728784382733,"db":"foo","collection":"bar"},"op":"d"}}`
evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": "abc"}, kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
assert.True(m.T(), evtData[constants.DeleteColumnMarker].(bool))
}

func (m *MongoTestSuite) TestMongoDBEventCustomer() {
payload := `
{
Expand Down Expand Up @@ -151,7 +160,6 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() {
}
}
`

evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{})
Expand Down

0 comments on commit 8a44105

Please sign in to comment.