Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 19, 2024
1 parent 2220047 commit 616e7d3
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
3 changes: 2 additions & 1 deletion sources/mysql/streaming/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func splitIntoBeforeAndAfter(operation string, rows [][]any) (iter.Seq2[[]any, [
}
}

func buildDebeziumSourcePayload(dbName string, tableName string, ts time.Time, position Position) util.Source {
func buildDebeziumSourcePayload(dbName string, tableName string, ts time.Time, position Position, currentGTID *string) util.Source {
return util.Source{
Connector: "mysql",
Database: dbName,
Expand All @@ -145,5 +145,6 @@ func buildDebeziumSourcePayload(dbName string, tableName string, ts time.Time, p
// MySQL specific
File: position.File,
Pos: int64(position.Pos),
Gtid: currentGTID,
}
}
4 changes: 2 additions & 2 deletions sources/mysql/streaming/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/artie-labs/reader/lib/debezium/transformer"
)

func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent) ([]lib.RawMessage, error) {
func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent, currentGTID *string) ([]lib.RawMessage, error) {
rowsEvent, err := typing.AssertType[*replication.RowsEvent](event.Event)
if err != nil {
return nil, fmt.Errorf("failed to assert a rows event: %w", err)
Expand Down Expand Up @@ -63,7 +63,7 @@ func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent) ([]l
return nil, fmt.Errorf("failed to get parsed columns: %w", err)
}

sourcePayload := buildDebeziumSourcePayload(i.cfg.Database, tableName, ts, i.position)
sourcePayload := buildDebeziumSourcePayload(i.cfg.Database, tableName, ts, i.position, currentGTID)
dbz := transformer.NewLightDebeziumTransformer(tableName, tblAdapter.PartitionKeys(), tblAdapter.GetFieldConverters())
for before, after := range beforeAndAfters {
var beforeRow map[string]any
Expand Down
6 changes: 4 additions & 2 deletions sources/mysql/streaming/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) {
defer cancel()

var rawMsgs []lib.RawMessage
var currentGTID *string
for i.batchSize > int32(len(rawMsgs)) {
select {
case <-ctx.Done():
Expand All @@ -165,7 +166,8 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to retrieve next GTID set: %w", err)
}

shouldProcess, err := mysql.ShouldProcessRow(i.position._gtidSet, next.String())
currentGTID = typing.ToPtr(next.String())
shouldProcess, err := mysql.ShouldProcessRow(i.position._gtidSet, *currentGTID)
if err != nil {
return nil, fmt.Errorf("failed to check if we should process row: %w", err)
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to persist DDL: %w", err)
}
case replication.WRITE_ROWS_EVENTv2, replication.UPDATE_ROWS_EVENTv2, replication.DELETE_ROWS_EVENTv2:
rows, err := i.processDML(ts, event)
rows, err := i.processDML(ts, event, currentGTID)
if err != nil {
return nil, fmt.Errorf("failed to process DML: %w", err)
}
Expand Down

0 comments on commit 616e7d3

Please sign in to comment.