Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed May 13, 2024
1 parent eee74c7 commit 1d88640
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 23 deletions.
11 changes: 7 additions & 4 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt
return nil, fmt.Errorf("empty message")
}

err := json.Unmarshal(bytes, &schemaEventPayload)
if err != nil {
if err := json.Unmarshal(bytes, &schemaEventPayload); err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}

Expand Down Expand Up @@ -156,10 +155,14 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon
// And we need to reconstruct the data bit since it will be empty.
// We _can_ rely on *before* since even without running replicate identity, it will still copy over
// the PK. We can explore simplifying this interface in the future by leveraging before.
retMap = map[string]any{
constants.DeleteColumnMarker: true,

if len(s.Payload.beforeMap) == 0 {
retMap = make(map[string]any)
} else {
retMap = s.Payload.beforeMap
}

retMap[constants.DeleteColumnMarker] = true
for k, v := range pkMap {
retMap[k] = v
}
Expand Down
50 changes: 32 additions & 18 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() {
{
"schema": {},
"payload": {
"before": null,
"before": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"[email protected]\", \"nested\": {\"object\": \"foo\"}}",
"after": null,
"patch": null,
"filter": null,
Expand All @@ -235,27 +235,41 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() {
}
}
`

evt, err := p.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
assert.NoError(p.T(), err)
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
assert.Equal(p.T(), "customers123", evt.GetTableName())
_, isOk := evtData[constants.UpdateColumnMarker]
assert.False(p.T(), isOk)
assert.Equal(p.T(), evtData["_id"], 1003)
assert.Equal(p.T(), evtData[constants.DeleteColumnMarker], true)
assert.Equal(p.T(), evt.GetExecutionTime(),
time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC))
assert.Equal(p.T(), true, evt.DeletePayload())
{
// Making sure the `before` payload is set.
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
assert.Equal(p.T(), "customers123", evt.GetTableName())

evtData, err = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{
IncludeArtieUpdatedAt: true,
})
assert.NoError(p.T(), err)
_, isOk = evtData[constants.UpdateColumnMarker]
assert.True(p.T(), isOk)
_, isOk := evtData[constants.UpdateColumnMarker]
assert.False(p.T(), isOk)

expectedKeyToVal := map[string]any{
"_id": 1003,
constants.DeleteColumnMarker: true,
"first_name": "Robin",
"email": "[email protected]",
}

for expectedKey, expectedVal := range expectedKeyToVal {
assert.Equal(p.T(), expectedVal, evtData[expectedKey], expectedKey)
}

assert.Equal(p.T(), evt.GetExecutionTime(), time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC))
assert.Equal(p.T(), true, evt.DeletePayload())
}

{
// Check `__artie_updated_at` is included
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{
IncludeArtieUpdatedAt: true,
})
assert.NoError(p.T(), err)
_, isOk := evtData[constants.UpdateColumnMarker]
assert.True(p.T(), isOk)
}
}

func (p *MongoTestSuite) TestGetEventFromBytesTombstone() {
Expand Down
1 change: 0 additions & 1 deletion lib/config/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const (

TemporaryTableTTL = 6 * time.Hour

// DBZPostgresFormat is the only supported CDC format right now
DBZPostgresFormat = "debezium.postgres"
DBZPostgresAltFormat = "debezium.postgres.wal2json"
DBZMongoFormat = "debezium.mongodb"
Expand Down

0 comments on commit 1d88640

Please sign in to comment.