Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 30, 2024
1 parent df45b84 commit ade10cf
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package mongo
import (
"encoding/json"
"fmt"
"log/slog"
"reflect"
"time"

"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/debezium"
Expand Down Expand Up @@ -172,6 +175,12 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf
for k, v := range pkMap {
retMap[k] = v
}

if _, isOk := retMap["ts"]; !isOk {
slog.Info("ts not found in data, adding it", slog.String("table", s.GetTableName()))
objectID := retMap["_id"].(primitive.ObjectID)
retMap["ts"] = objectID.Timestamp()
}
case "r", "u", "c":
retMap = s.Payload.afterMap
// TODO: Remove this code.
Expand Down

0 comments on commit ade10cf

Please sign in to comment.