Skip to content

Commit

Permalink
[MongoDB] Return error instead of logging a warning on primary key pr…
Browse files Browse the repository at this point in the history
…esence in `retData` (#936)
  • Loading branch information
Tang8330 authored Sep 27, 2024
1 parent dbf812b commit 45ca0e9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 54 deletions.
13 changes: 4 additions & 9 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mongo
import (
"encoding/json"
"fmt"
"log/slog"
"reflect"
"time"

Expand All @@ -21,11 +20,11 @@ import (
type Debezium struct{}

func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var schemaEventPayload SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
}

var schemaEventPayload SchemaEventPayload
if err := json.Unmarshal(bytes, &schemaEventPayload); err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}
Expand Down Expand Up @@ -174,18 +173,14 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf
}
} else {
retMap = s.Payload.afterMap
// We need this because there's an edge case with Debezium
// Where _id gets rewritten as id in the partition key.
// TODO: Remove this code.
for key, value := range pkMap {
retData, isOk := retMap[key]
if !isOk {
slog.Warn("key not found in retMap", slog.String("key", key), slog.Any("retData", retData))
return nil, fmt.Errorf("key %q not found in data", key)
} else if retData != value {
slog.Warn("value mismatch", slog.String("key", key), slog.Any("value", value), slog.Any("retData", retData))
return nil, fmt.Errorf("value mismatch for key %q: expected %v, got %v", key, retData, value)
}

// TODO: Preserve behavior.
retMap[key] = value
}

retMap[constants.DeleteColumnMarker] = false
Expand Down
8 changes: 4 additions & 4 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,21 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() {

evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{})
evtData, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
_, isOk := evtData[constants.UpdateColumnMarker]
assert.False(m.T(), isOk)
assert.Equal(m.T(), evtData["_id"], 1003)
assert.Equal(m.T(), evtData["_id"], int64(1003))
assert.Equal(m.T(), evtData["first_name"], "Robin")
assert.Equal(m.T(), evtData["last_name"], "Tang")
assert.Equal(m.T(), evtData["email"], "[email protected]")

evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{})
evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
_, isOk = evtDataWithIncludedAt[constants.UpdateColumnMarker]
assert.False(m.T(), isOk)

evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true})
evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true})
assert.NoError(m.T(), err)

assert.Equal(m.T(), ext.NewExtendedTime(time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC), ext.TimestampTzKindType, ext.ISO8601), evtDataWithIncludedAt[constants.DatabaseUpdatedColumnMarker])
Expand Down
73 changes: 32 additions & 41 deletions processes/consumer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestProcessMessageFailures(t *testing.T) {
Format: &mgo,
})

vals := []string{
`{
val := `{
"schema": {
"type": "struct",
"fields": [{
Expand Down Expand Up @@ -135,7 +134,7 @@ func TestProcessMessageFailures(t *testing.T) {
},
"payload": {
"before": null,
"after": "{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}",
"after": "{\"_id\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}",
"patch": null,
"filter": null,
"updateDescription": null,
Expand All @@ -157,57 +156,49 @@ func TestProcessMessageFailures(t *testing.T) {
"ts_ms": 1668753329387,
"transaction": null
}
}`,
}
}`

idx := 0
memoryDB := memDB
for _, val := range vals {
idx += 1
msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", idx))
if val != "" {
msg.KafkaMsg.Value = []byte(val)
}

args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

td := memoryDB.GetOrCreateTableData(table)
// Check that there are corresponding row(s) in the memory DB
assert.Len(t, td.Rows(), idx)
msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004))
msg.KafkaMsg.Value = []byte(val)
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

td := memoryDB.GetOrCreateTableData(table)
// Check that there are corresponding row(s) in the memory DB
assert.Len(t, td.Rows(), 1)

var rowData map[string]any
for _, row := range td.Rows() {
if row["_id"] == "1" {
if row["_id"] == "1004" {
rowData = row
}
}

val, isOk := rowData[constants.DeleteColumnMarker]
assert.True(t, isOk)
assert.False(t, val.(bool))

msg.KafkaMsg.Value = []byte("not a json object")
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
{
rowValue, isOk := rowData[constants.DeleteColumnMarker]
assert.True(t, isOk)
assert.False(t, rowValue.(bool))
}
{
msg.KafkaMsg.Value = []byte("not a json object")
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal")
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal")
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
}
}

func TestProcessMessageSkip(t *testing.T) {
Expand Down

0 comments on commit 45ca0e9

Please sign in to comment.