diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 8cd4cbcdc..8916f3922 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -3,7 +3,6 @@ package mongo import ( "encoding/json" "fmt" - "log/slog" "reflect" "time" @@ -21,11 +20,11 @@ import ( type Debezium struct{} func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { - var schemaEventPayload SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") } + var schemaEventPayload SchemaEventPayload if err := json.Unmarshal(bytes, &schemaEventPayload); err != nil { return nil, fmt.Errorf("failed to unmarshal json: %w", err) } @@ -174,18 +173,14 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf } } else { retMap = s.Payload.afterMap - // We need this because there's an edge case with Debezium - // Where _id gets rewritten as id in the partition key. + // TODO: Remove this code. for key, value := range pkMap { retData, isOk := retMap[key] if !isOk { - slog.Warn("key not found in retMap", slog.String("key", key), slog.Any("retData", retData)) + return nil, fmt.Errorf("key %q not found in data", key) } else if retData != value { - slog.Warn("value mismatch", slog.String("key", key), slog.Any("value", value), slog.Any("retData", retData)) + return nil, fmt.Errorf("value mismatch for key %q: expected %v, got %v", key, retData, value) } - - // TODO: Preserve behavior. - retMap[key] = value } retMap[constants.DeleteColumnMarker] = false diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index 3d829383f..87d262505 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -154,21 +154,21 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() { evt, err := m.Debezium.GetEventFromBytes([]byte(payload)) assert.NoError(m.T(), err) - evtData, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{}) + evtData, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{}) assert.NoError(m.T(), err) _, isOk := evtData[constants.UpdateColumnMarker] assert.False(m.T(), isOk) - assert.Equal(m.T(), evtData["_id"], 1003) + assert.Equal(m.T(), evtData["_id"], int64(1003)) assert.Equal(m.T(), evtData["first_name"], "Robin") assert.Equal(m.T(), evtData["last_name"], "Tang") assert.Equal(m.T(), evtData["email"], "robin@example.com") - evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{}) + evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{}) assert.NoError(m.T(), err) _, isOk = evtDataWithIncludedAt[constants.UpdateColumnMarker] assert.False(m.T(), isOk) - evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true}) + evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true}) assert.NoError(m.T(), err) assert.Equal(m.T(), ext.NewExtendedTime(time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC), ext.TimestampTzKindType, ext.ISO8601), evtDataWithIncludedAt[constants.DatabaseUpdatedColumnMarker]) diff --git a/processes/consumer/process_test.go b/processes/consumer/process_test.go index d7bba08a7..2d5f8301b 100644 --- a/processes/consumer/process_test.go +++ b/processes/consumer/process_test.go @@ -104,8 +104,7 @@ func TestProcessMessageFailures(t *testing.T) { Format: &mgo, }) - vals := []string{ - `{ + val := `{ "schema": { "type": "struct", "fields": [{ @@ -135,7 +134,7 @@ func TestProcessMessageFailures(t *testing.T) { }, "payload": { "before": null, - "after": "{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", + "after": "{\"_id\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", "patch": null, "filter": null, "updateDescription": null, @@ -157,57 +156,49 @@ func TestProcessMessageFailures(t *testing.T) { "ts_ms": 1668753329387, "transaction": null } -}`, - } +}` - idx := 0 memoryDB := memDB - for _, val := range vals { - idx += 1 - msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", idx)) - if val != "" { - msg.KafkaMsg.Value = []byte(val) - } - - args = processArgs{ - Msg: msg, - GroupID: "foo", - TopicToConfigFormatMap: tcFmtMap, - } - - tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) - assert.NoError(t, err) - assert.Equal(t, table, tableName) - - td := memoryDB.GetOrCreateTableData(table) - // Check that there are corresponding row(s) in the memory DB - assert.Len(t, td.Rows(), idx) + msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004)) + msg.KafkaMsg.Value = []byte(val) + args = processArgs{ + Msg: msg, + GroupID: "foo", + TopicToConfigFormatMap: tcFmtMap, } + tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) + assert.NoError(t, err) + assert.Equal(t, table, tableName) + td := memoryDB.GetOrCreateTableData(table) + // Check that there are corresponding row(s) in the memory DB + assert.Len(t, td.Rows(), 1) var rowData map[string]any for _, row := range td.Rows() { - if row["_id"] == "1" { + if row["_id"] == "1004" { rowData = row } } - - val, isOk := rowData[constants.DeleteColumnMarker] - assert.True(t, isOk) - assert.False(t, val.(bool)) - - msg.KafkaMsg.Value = []byte("not a json object") - args = processArgs{ - Msg: msg, - GroupID: "foo", - TopicToConfigFormatMap: tcFmtMap, + { + rowValue, isOk := rowData[constants.DeleteColumnMarker] + assert.True(t, isOk) + assert.False(t, rowValue.(bool)) } + { + msg.KafkaMsg.Value = []byte("not a json object") + args = processArgs{ + Msg: msg, + GroupID: "foo", + TopicToConfigFormatMap: tcFmtMap, + } - tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) - assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal") - assert.Empty(t, tableName) - assert.True(t, td.NumberOfRows() > 0) + tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) + assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal") + assert.Empty(t, tableName) + assert.True(t, td.NumberOfRows() > 0) + } } func TestProcessMessageSkip(t *testing.T) {