From 2a3ebe4794bb95a9ddd9657a44ceac2fad0db5ce Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 19 Dec 2024 16:19:49 -0800 Subject: [PATCH] [MySQL] Include GTID in Debezium Source (#624) --- sources/mysql/streaming/converter.go | 3 ++- sources/mysql/streaming/dml.go | 4 ++-- sources/mysql/streaming/iterator.go | 6 ++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sources/mysql/streaming/converter.go b/sources/mysql/streaming/converter.go index 1fb3310a..72622037 100644 --- a/sources/mysql/streaming/converter.go +++ b/sources/mysql/streaming/converter.go @@ -135,7 +135,7 @@ func splitIntoBeforeAndAfter(operation string, rows [][]any) (iter.Seq2[[]any, [ } } -func buildDebeziumSourcePayload(dbName string, tableName string, ts time.Time, position Position) util.Source { +func buildDebeziumSourcePayload(dbName string, tableName string, ts time.Time, position Position, currentGTID *string) util.Source { return util.Source{ Connector: "mysql", Database: dbName, @@ -145,5 +145,6 @@ func buildDebeziumSourcePayload(dbName string, tableName string, ts time.Time, p // MySQL specific File: position.File, Pos: int64(position.Pos), + Gtid: currentGTID, } } diff --git a/sources/mysql/streaming/dml.go b/sources/mysql/streaming/dml.go index 669f6dd1..6afc2fcf 100644 --- a/sources/mysql/streaming/dml.go +++ b/sources/mysql/streaming/dml.go @@ -13,7 +13,7 @@ import ( "github.com/artie-labs/reader/lib/debezium/transformer" ) -func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent) ([]lib.RawMessage, error) { +func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent, currentGTID *string) ([]lib.RawMessage, error) { rowsEvent, err := typing.AssertType[*replication.RowsEvent](event.Event) if err != nil { return nil, fmt.Errorf("failed to assert a rows event: %w", err) @@ -63,7 +63,7 @@ func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent) ([]l return nil, fmt.Errorf("failed to get parsed columns: %w", err) } - sourcePayload := buildDebeziumSourcePayload(i.cfg.Database, tableName, ts, i.position) + sourcePayload := buildDebeziumSourcePayload(i.cfg.Database, tableName, ts, i.position, currentGTID) dbz := transformer.NewLightDebeziumTransformer(tableName, tblAdapter.PartitionKeys(), tblAdapter.GetFieldConverters()) for before, after := range beforeAndAfters { var beforeRow map[string]any diff --git a/sources/mysql/streaming/iterator.go b/sources/mysql/streaming/iterator.go index e16a922a..a0301445 100644 --- a/sources/mysql/streaming/iterator.go +++ b/sources/mysql/streaming/iterator.go @@ -145,6 +145,7 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) { defer cancel() var rawMsgs []lib.RawMessage + var currentGTID *string for i.batchSize > int32(len(rawMsgs)) { select { case <-ctx.Done(): @@ -165,7 +166,8 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to retrieve next GTID set: %w", err) } - shouldProcess, err := mysql.ShouldProcessRow(i.position._gtidSet, next.String()) + currentGTID = typing.ToPtr(next.String()) + shouldProcess, err := mysql.ShouldProcessRow(i.position._gtidSet, *currentGTID) if err != nil { return nil, fmt.Errorf("failed to check if we should process row: %w", err) } @@ -204,7 +206,7 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to persist DDL: %w", err) } case replication.WRITE_ROWS_EVENTv2, replication.UPDATE_ROWS_EVENTv2, replication.DELETE_ROWS_EVENTv2: - rows, err := i.processDML(ts, event) + rows, err := i.processDML(ts, event, currentGTID) if err != nil { return nil, fmt.Errorf("failed to process DML: %w", err) }