diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 8916f3922..59822b383 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -152,7 +152,9 @@ func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConfig) (map[string]any, error) { var retMap map[string]any - if len(s.Payload.afterMap) == 0 { + + switch s.Operation() { + case "d": // This is a delete event, so mark it as deleted. // And we need to reconstruct the data bit since it will be empty. // We _can_ rely on *before* since even without running replicate identity, it will still copy over @@ -171,7 +173,7 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf for k, v := range pkMap { retMap[k] = v } - } else { + case "r", "u", "c": retMap = s.Payload.afterMap // TODO: Remove this code. for key, value := range pkMap { @@ -185,6 +187,8 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf retMap[constants.DeleteColumnMarker] = false retMap[constants.OnlySetDeleteColumnMarker] = false + default: + return nil, fmt.Errorf("unknown operation: %q", s.Operation()) } if tc.IncludeArtieUpdatedAt { diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index 87d262505..926fa23c9 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -121,6 +121,15 @@ func (m *MongoTestSuite) TestMongoDBEventOrder() { assert.False(m.T(), evt.DeletePayload()) } +func (m *MongoTestSuite) TestMongoDBEvent_DeletedRow() { + payload := `{"schema":{"type":"","fields":null},"payload":{"before":"{\"_id\":\"abc\"}","after":"{\"_id\":\"abc\"}","source":{"connector":"","ts_ms":1728784382733,"db":"foo","collection":"bar"},"op":"d"}}` + evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) + assert.NoError(m.T(), err) + evtData, err := evt.GetData(map[string]any{"_id": "abc"}, kafkalib.TopicConfig{}) + assert.NoError(m.T(), err) + assert.True(m.T(), evtData[constants.DeleteColumnMarker].(bool)) +} + func (m *MongoTestSuite) TestMongoDBEventCustomer() { payload := ` { @@ -151,7 +160,6 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { } } ` - evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) evtData, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{})