From 1d8864026b26272bb58612daa84204b1facf7a58 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 12 May 2024 17:39:32 -0700 Subject: [PATCH] Checkpoint. --- lib/cdc/mongo/debezium.go | 11 ++++--- lib/cdc/mongo/debezium_test.go | 50 ++++++++++++++++++++----------- lib/config/constants/constants.go | 1 - 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index cea3ac467..b7158c3b0 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -28,8 +28,7 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt return nil, fmt.Errorf("empty message") } - err := json.Unmarshal(bytes, &schemaEventPayload) - if err != nil { + if err := json.Unmarshal(bytes, &schemaEventPayload); err != nil { return nil, fmt.Errorf("failed to unmarshal json: %w", err) } @@ -156,10 +155,14 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon // And we need to reconstruct the data bit since it will be empty. // We _can_ rely on *before* since even without running replicate identity, it will still copy over // the PK. We can explore simplifying this interface in the future by leveraging before. - retMap = map[string]any{ - constants.DeleteColumnMarker: true, + + if len(s.Payload.beforeMap) == 0 { + retMap = make(map[string]any) + } else { + retMap = s.Payload.beforeMap } + retMap[constants.DeleteColumnMarker] = true for k, v := range pkMap { retMap[k] = v } diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index eba0dc9a2..528a34f97 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -210,7 +210,7 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() { { "schema": {}, "payload": { - "before": null, + "before": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@example.com\", \"nested\": {\"object\": \"foo\"}}", "after": null, "patch": null, "filter": null, @@ -235,27 +235,41 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() { } } ` - evt, err := p.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) assert.NoError(p.T(), err) - evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) - assert.NoError(p.T(), err) - assert.Equal(p.T(), "customers123", evt.GetTableName()) - _, isOk := evtData[constants.UpdateColumnMarker] - assert.False(p.T(), isOk) - assert.Equal(p.T(), evtData["_id"], 1003) - assert.Equal(p.T(), evtData[constants.DeleteColumnMarker], true) - assert.Equal(p.T(), evt.GetExecutionTime(), - time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC)) - assert.Equal(p.T(), true, evt.DeletePayload()) + { + // Making sure the `before` payload is set. + evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + assert.NoError(p.T(), err) + assert.Equal(p.T(), "customers123", evt.GetTableName()) - evtData, err = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{ - IncludeArtieUpdatedAt: true, - }) - assert.NoError(p.T(), err) - _, isOk = evtData[constants.UpdateColumnMarker] - assert.True(p.T(), isOk) + _, isOk := evtData[constants.UpdateColumnMarker] + assert.False(p.T(), isOk) + + expectedKeyToVal := map[string]any{ + "_id": 1003, + constants.DeleteColumnMarker: true, + "first_name": "Robin", + "email": "robin@example.com", + } + + for expectedKey, expectedVal := range expectedKeyToVal { + assert.Equal(p.T(), expectedVal, evtData[expectedKey], expectedKey) + } + assert.Equal(p.T(), evt.GetExecutionTime(), time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC)) + assert.Equal(p.T(), true, evt.DeletePayload()) + } + + { + // Check `__artie_updated_at` is included + evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{ + IncludeArtieUpdatedAt: true, + }) + assert.NoError(p.T(), err) + _, isOk := evtData[constants.UpdateColumnMarker] + assert.True(p.T(), isOk) + } } func (p *MongoTestSuite) TestGetEventFromBytesTombstone() { diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index d886bf091..669f1bbf6 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -23,7 +23,6 @@ const ( TemporaryTableTTL = 6 * time.Hour - // DBZPostgresFormat is the only supported CDC format right now DBZPostgresFormat = "debezium.postgres" DBZPostgresAltFormat = "debezium.postgres.wal2json" DBZMongoFormat = "debezium.mongodb"