From 0cfeb12d4831c27714b62d635b4280e41f42b201 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Sep 2024 16:48:40 -0700 Subject: [PATCH] [MongoDB] Adding warning before removing (#924) --- lib/cdc/mongo/debezium.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 72850d766..c5f07b451 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -3,6 +3,7 @@ package mongo import ( "encoding/json" "fmt" + "log/slog" "reflect" "time" @@ -175,8 +176,16 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf retMap = s.Payload.afterMap // We need this because there's an edge case with Debezium // Where _id gets rewritten as id in the partition key. - for k, v := range pkMap { - retMap[k] = v + for key, value := range pkMap { + retData, isOk := retMap[key] + if !isOk { + slog.Warn("key not found in retMap", slog.String("key", key), slog.Any("retData", retData)) + } else if retData != value { + slog.Warn("value mismatch", slog.String("key", key), slog.Any("value", value), slog.Any("retData", retData)) + } + + // TODO: Preserve behavior. + retMap[key] = value } retMap[constants.DeleteColumnMarker] = false