Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MongoDB] Return error instead of logging a warning on primary key presence in retData #936

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mongo
import (
"encoding/json"
"fmt"
"log/slog"
"reflect"
"time"

Expand All @@ -21,11 +20,11 @@ import (
type Debezium struct{}

func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var schemaEventPayload SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
}

var schemaEventPayload SchemaEventPayload
if err := json.Unmarshal(bytes, &schemaEventPayload); err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}
Expand Down Expand Up @@ -174,18 +173,14 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf
}
} else {
retMap = s.Payload.afterMap
// We need this because there's an edge case with Debezium
// Where _id gets rewritten as id in the partition key.
// TODO: Remove this code.
for key, value := range pkMap {
retData, isOk := retMap[key]
if !isOk {
slog.Warn("key not found in retMap", slog.String("key", key), slog.Any("retData", retData))
return nil, fmt.Errorf("key %q not found in data", key)
} else if retData != value {
slog.Warn("value mismatch", slog.String("key", key), slog.Any("value", value), slog.Any("retData", retData))
return nil, fmt.Errorf("value mismatch for key %q: expected %v, got %v", key, retData, value)
}

// TODO: Preserve behavior.
retMap[key] = value
}

retMap[constants.DeleteColumnMarker] = false
Expand Down
8 changes: 4 additions & 4 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,21 @@ func (m *MongoTestSuite) TestMongoDBEventCustomer() {

evt, err := m.Debezium.GetEventFromBytes([]byte(payload))
assert.NoError(m.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{})
evtData, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
_, isOk := evtData[constants.UpdateColumnMarker]
assert.False(m.T(), isOk)
assert.Equal(m.T(), evtData["_id"], 1003)
assert.Equal(m.T(), evtData["_id"], int64(1003))
assert.Equal(m.T(), evtData["first_name"], "Robin")
assert.Equal(m.T(), evtData["last_name"], "Tang")
assert.Equal(m.T(), evtData["email"], "[email protected]")

evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{})
evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{})
assert.NoError(m.T(), err)
_, isOk = evtDataWithIncludedAt[constants.UpdateColumnMarker]
assert.False(m.T(), isOk)

evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": 1003}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true})
evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": int64(1003)}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true})
assert.NoError(m.T(), err)

assert.Equal(m.T(), ext.NewExtendedTime(time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC), ext.TimestampTzKindType, ext.ISO8601), evtDataWithIncludedAt[constants.DatabaseUpdatedColumnMarker])
Expand Down
73 changes: 32 additions & 41 deletions processes/consumer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestProcessMessageFailures(t *testing.T) {
Format: &mgo,
})

vals := []string{
`{
val := `{
"schema": {
"type": "struct",
"fields": [{
Expand Down Expand Up @@ -135,7 +134,7 @@ func TestProcessMessageFailures(t *testing.T) {
},
"payload": {
"before": null,
"after": "{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}",
"after": "{\"_id\": \"1004\"},\"first_name\": \"Anne\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}",
"patch": null,
"filter": null,
"updateDescription": null,
Expand All @@ -157,57 +156,49 @@ func TestProcessMessageFailures(t *testing.T) {
"ts_ms": 1668753329387,
"transaction": null
}
}`,
}
}`

idx := 0
memoryDB := memDB
for _, val := range vals {
idx += 1
msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", idx))
if val != "" {
msg.KafkaMsg.Value = []byte(val)
}

args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

td := memoryDB.GetOrCreateTableData(table)
// Check that there are corresponding row(s) in the memory DB
assert.Len(t, td.Rows(), idx)
msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004))
msg.KafkaMsg.Value = []byte(val)
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

td := memoryDB.GetOrCreateTableData(table)
// Check that there are corresponding row(s) in the memory DB
assert.Len(t, td.Rows(), 1)

var rowData map[string]any
for _, row := range td.Rows() {
if row["_id"] == "1" {
if row["_id"] == "1004" {
rowData = row
}
}

val, isOk := rowData[constants.DeleteColumnMarker]
assert.True(t, isOk)
assert.False(t, val.(bool))

msg.KafkaMsg.Value = []byte("not a json object")
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
{
rowValue, isOk := rowData[constants.DeleteColumnMarker]
assert.True(t, isOk)
assert.False(t, rowValue.(bool))
}
{
msg.KafkaMsg.Value = []byte("not a json object")
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal")
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal")
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
}
}

func TestProcessMessageSkip(t *testing.T) {
Expand Down
Loading