Skip to content

Commit

Permalink
Support MySQL (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Mar 13, 2023
1 parent 3fa3256 commit 683ecad
Show file tree
Hide file tree
Showing 11 changed files with 595 additions and 216 deletions.
8 changes: 5 additions & 3 deletions lib/cdc/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package format

import (
"context"
"github.com/artie-labs/transfer/lib/cdc/mysql"

"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/cdc/mongo"
Expand All @@ -10,13 +11,14 @@ import (
)

var (
d postgres.Debezium
m mongo.Debezium
d postgres.Debezium
m mongo.Debezium
mySQL mysql.Debezium
)

func GetFormatParser(ctx context.Context, label string) cdc.Format {
validFormats := []cdc.Format{
&d, &m,
&d, &m, &mySQL,
}

for _, validFormat := range validFormats {
Expand Down
45 changes: 45 additions & 0 deletions lib/cdc/mysql/debezium.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mysql

import (
"context"
"encoding/json"
"fmt"
"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
)

type Debezium string

func (d *Debezium) GetEventFromBytes(ctx context.Context, bytes []byte) (cdc.Event, error) {
var event util.SchemaEventPayload
if len(bytes) == 0 {
// This is a Kafka Tombstone event.
return &event, nil
}

err := json.Unmarshal(bytes, &event)
if err != nil {
return nil, err
}

return &event, nil
}

func (d *Debezium) Labels() []string {
return []string{constants.DBZMySQLFormat}
}

func (d *Debezium) GetPrimaryKey(ctx context.Context, key []byte, tc *kafkalib.TopicConfig) (pkName string, pkValue interface{}, err error) {
switch tc.CDCKeyFormat {
case "org.apache.kafka.connect.json.JsonConverter":
return util.ParseJSONKey(key)
case "org.apache.kafka.connect.storage.StringConverter":
return util.ParseStringKey(key)
default:
err = fmt.Errorf("format: %s is not supported", tc.CDCKeyFormat)
}

return
}
20 changes: 20 additions & 0 deletions lib/cdc/mysql/debezium_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mysql

import (
"github.com/stretchr/testify/suite"
"testing"
)

type MySQLTestSuite struct {
suite.Suite
*Debezium
}

func (m *MySQLTestSuite) SetupTest() {
var debezium Debezium
m.Debezium = &debezium
}

func TestPostgresTestSuite(t *testing.T) {
suite.Run(t, new(MySQLTestSuite))
}
297 changes: 297 additions & 0 deletions lib/cdc/mysql/debezium_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
package mysql

import (
"context"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/stretchr/testify/assert"
"time"
)

func (m *MySQLTestSuite) TestGetEventFromBytes() {
payload := `
{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "first_name"
}, {
"type": "string",
"optional": false,
"field": "last_name"
}, {
"type": "string",
"optional": false,
"field": "email"
}, {
"type": "boolean",
"optional": true,
"field": "boolean_test"
}, {
"type": "boolean",
"optional": true,
"field": "bool_test"
}, {
"type": "int16",
"optional": true,
"field": "tinyint_test"
}, {
"type": "int16",
"optional": true,
"field": "smallint_test"
}, {
"type": "int32",
"optional": true,
"field": "mediumint_test"
}, {
"type": "int32",
"optional": true,
"field": "int_test"
}, {
"type": "int32",
"optional": true,
"field": "integer_test"
}, {
"type": "int32",
"optional": true,
"field": "int_x_test"
}, {
"type": "int64",
"optional": true,
"field": "big_int_test"
}],
"optional": true,
"name": "mysql1.inventory.customers.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "first_name"
}, {
"type": "string",
"optional": false,
"field": "last_name"
}, {
"type": "string",
"optional": false,
"field": "email"
}, {
"type": "boolean",
"optional": true,
"field": "boolean_test"
}, {
"type": "boolean",
"optional": true,
"field": "bool_test"
}, {
"type": "int16",
"optional": true,
"field": "tinyint_test"
}, {
"type": "int16",
"optional": true,
"field": "smallint_test"
}, {
"type": "int32",
"optional": true,
"field": "mediumint_test"
}, {
"type": "int32",
"optional": true,
"field": "int_test"
}, {
"type": "int32",
"optional": true,
"field": "integer_test"
}, {
"type": "int32",
"optional": true,
"field": "int_x_test"
}, {
"type": "int64",
"optional": true,
"field": "big_int_test"
}],
"optional": true,
"name": "mysql1.inventory.customers.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": true,
"field": "sequence"
}, {
"type": "string",
"optional": true,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "server_id"
}, {
"type": "string",
"optional": true,
"field": "gtid"
}, {
"type": "string",
"optional": false,
"field": "file"
}, {
"type": "int64",
"optional": false,
"field": "pos"
}, {
"type": "int32",
"optional": false,
"field": "row"
}, {
"type": "int64",
"optional": true,
"field": "thread"
}, {
"type": "string",
"optional": true,
"field": "query"
}],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "int64",
"optional": false,
"field": "total_order"
}, {
"type": "int64",
"optional": false,
"field": "data_collection_order"
}],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}],
"optional": false,
"name": "mysql1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]",
"boolean_test": true,
"bool_test": false,
"tinyint_test": 1,
"smallint_test": 2,
"mediumint_test": 3,
"int_test": 4,
"integer_test": 5,
"int_x_test": 6,
"big_int_test": 9223372036854775806
},
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]",
"boolean_test": true,
"bool_test": false,
"tinyint_test": 1,
"smallint_test": 2,
"mediumint_test": 3,
"int_test": 4,
"integer_test": 5,
"int_x_test": 7,
"big_int_test": 9223372036854775806
},
"source": {
"version": "2.0.1.Final",
"connector": "mysql",
"name": "mysql1",
"ts_ms": 1678735164000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 3723,
"row": 0,
"thread": 12,
"query": null
},
"op": "u",
"ts_ms": 1678735164638,
"transaction": null
}
}`
evt, err := m.Debezium.GetEventFromBytes(context.Background(), []byte(payload))
assert.NoError(m.T(), err)
assert.Equal(m.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime())

evtData := evt.GetData(context.Background(), "id", 1001, &kafkalib.TopicConfig{})
assert.Equal(m.T(), evtData["id"], 1001)
assert.Equal(m.T(), evtData["first_name"], "Sally")
assert.Equal(m.T(), evtData["bool_test"], false)

}
Loading

0 comments on commit 683ecad

Please sign in to comment.