diff --git a/lib/cdc/format/format.go b/lib/cdc/format/format.go index 7c3ed503e..3cc3e99f8 100644 --- a/lib/cdc/format/format.go +++ b/lib/cdc/format/format.go @@ -2,6 +2,7 @@ package format import ( "context" + "github.com/artie-labs/transfer/lib/cdc/mysql" "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/cdc/mongo" @@ -10,13 +11,14 @@ import ( ) var ( - d postgres.Debezium - m mongo.Debezium + d postgres.Debezium + m mongo.Debezium + mySQL mysql.Debezium ) func GetFormatParser(ctx context.Context, label string) cdc.Format { validFormats := []cdc.Format{ - &d, &m, + &d, &m, &mySQL, } for _, validFormat := range validFormats { diff --git a/lib/cdc/mysql/debezium.go b/lib/cdc/mysql/debezium.go new file mode 100644 index 000000000..9c5ad2507 --- /dev/null +++ b/lib/cdc/mysql/debezium.go @@ -0,0 +1,45 @@ +package mysql + +import ( + "context" + "encoding/json" + "fmt" + "github.com/artie-labs/transfer/lib/cdc" + "github.com/artie-labs/transfer/lib/cdc/util" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/kafkalib" +) + +type Debezium string + +func (d *Debezium) GetEventFromBytes(ctx context.Context, bytes []byte) (cdc.Event, error) { + var event util.SchemaEventPayload + if len(bytes) == 0 { + // This is a Kafka Tombstone event. + return &event, nil + } + + err := json.Unmarshal(bytes, &event) + if err != nil { + return nil, err + } + + return &event, nil +} + +func (d *Debezium) Labels() []string { + return []string{constants.DBZMySQLFormat} +} + +func (d *Debezium) GetPrimaryKey(ctx context.Context, key []byte, tc *kafkalib.TopicConfig) (pkName string, pkValue interface{}, err error) { + switch tc.CDCKeyFormat { + case "org.apache.kafka.connect.json.JsonConverter": + return util.ParseJSONKey(key) + case "org.apache.kafka.connect.storage.StringConverter": + return util.ParseStringKey(key) + default: + err = fmt.Errorf("format: %s is not supported", tc.CDCKeyFormat) + } + + return +} diff --git a/lib/cdc/mysql/debezium_suite_test.go b/lib/cdc/mysql/debezium_suite_test.go new file mode 100644 index 000000000..eae390d89 --- /dev/null +++ b/lib/cdc/mysql/debezium_suite_test.go @@ -0,0 +1,20 @@ +package mysql + +import ( + "github.com/stretchr/testify/suite" + "testing" +) + +type MySQLTestSuite struct { + suite.Suite + *Debezium +} + +func (m *MySQLTestSuite) SetupTest() { + var debezium Debezium + m.Debezium = &debezium +} + +func TestPostgresTestSuite(t *testing.T) { + suite.Run(t, new(MySQLTestSuite)) +} diff --git a/lib/cdc/mysql/debezium_test.go b/lib/cdc/mysql/debezium_test.go new file mode 100644 index 000000000..66872fab3 --- /dev/null +++ b/lib/cdc/mysql/debezium_test.go @@ -0,0 +1,297 @@ +package mysql + +import ( + "context" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/stretchr/testify/assert" + "time" +) + +func (m *MySQLTestSuite) TestGetEventFromBytes() { + payload := ` +{ + "schema": { + "type": "struct", + "fields": [{ + "type": "struct", + "fields": [{ + "type": "int32", + "optional": false, + "field": "id" + }, { + "type": "string", + "optional": false, + "field": "first_name" + }, { + "type": "string", + "optional": false, + "field": "last_name" + }, { + "type": "string", + "optional": false, + "field": "email" + }, { + "type": "boolean", + "optional": true, + "field": "boolean_test" + }, { + "type": "boolean", + "optional": true, + "field": "bool_test" + }, { + "type": "int16", + "optional": true, + "field": "tinyint_test" + }, { + "type": "int16", + "optional": true, + "field": "smallint_test" + }, { + "type": "int32", + "optional": true, + "field": "mediumint_test" + }, { + "type": "int32", + "optional": true, + "field": "int_test" + }, { + "type": "int32", + "optional": true, + "field": "integer_test" + }, { + "type": "int32", + "optional": true, + "field": "int_x_test" + }, { + "type": "int64", + "optional": true, + "field": "big_int_test" + }], + "optional": true, + "name": "mysql1.inventory.customers.Value", + "field": "before" + }, { + "type": "struct", + "fields": [{ + "type": "int32", + "optional": false, + "field": "id" + }, { + "type": "string", + "optional": false, + "field": "first_name" + }, { + "type": "string", + "optional": false, + "field": "last_name" + }, { + "type": "string", + "optional": false, + "field": "email" + }, { + "type": "boolean", + "optional": true, + "field": "boolean_test" + }, { + "type": "boolean", + "optional": true, + "field": "bool_test" + }, { + "type": "int16", + "optional": true, + "field": "tinyint_test" + }, { + "type": "int16", + "optional": true, + "field": "smallint_test" + }, { + "type": "int32", + "optional": true, + "field": "mediumint_test" + }, { + "type": "int32", + "optional": true, + "field": "int_test" + }, { + "type": "int32", + "optional": true, + "field": "integer_test" + }, { + "type": "int32", + "optional": true, + "field": "int_x_test" + }, { + "type": "int64", + "optional": true, + "field": "big_int_test" + }], + "optional": true, + "name": "mysql1.inventory.customers.Value", + "field": "after" + }, { + "type": "struct", + "fields": [{ + "type": "string", + "optional": false, + "field": "version" + }, { + "type": "string", + "optional": false, + "field": "connector" + }, { + "type": "string", + "optional": false, + "field": "name" + }, { + "type": "int64", + "optional": false, + "field": "ts_ms" + }, { + "type": "string", + "optional": true, + "name": "io.debezium.data.Enum", + "version": 1, + "parameters": { + "allowed": "true,last,false,incremental" + }, + "default": "false", + "field": "snapshot" + }, { + "type": "string", + "optional": false, + "field": "db" + }, { + "type": "string", + "optional": true, + "field": "sequence" + }, { + "type": "string", + "optional": true, + "field": "table" + }, { + "type": "int64", + "optional": false, + "field": "server_id" + }, { + "type": "string", + "optional": true, + "field": "gtid" + }, { + "type": "string", + "optional": false, + "field": "file" + }, { + "type": "int64", + "optional": false, + "field": "pos" + }, { + "type": "int32", + "optional": false, + "field": "row" + }, { + "type": "int64", + "optional": true, + "field": "thread" + }, { + "type": "string", + "optional": true, + "field": "query" + }], + "optional": false, + "name": "io.debezium.connector.mysql.Source", + "field": "source" + }, { + "type": "string", + "optional": false, + "field": "op" + }, { + "type": "int64", + "optional": true, + "field": "ts_ms" + }, { + "type": "struct", + "fields": [{ + "type": "string", + "optional": false, + "field": "id" + }, { + "type": "int64", + "optional": false, + "field": "total_order" + }, { + "type": "int64", + "optional": false, + "field": "data_collection_order" + }], + "optional": true, + "name": "event.block", + "version": 1, + "field": "transaction" + }], + "optional": false, + "name": "mysql1.inventory.customers.Envelope", + "version": 1 + }, + "payload": { + "before": { + "id": 1001, + "first_name": "Sally", + "last_name": "Thomas", + "email": "sally.thomas@acme.com", + "boolean_test": true, + "bool_test": false, + "tinyint_test": 1, + "smallint_test": 2, + "mediumint_test": 3, + "int_test": 4, + "integer_test": 5, + "int_x_test": 6, + "big_int_test": 9223372036854775806 + }, + "after": { + "id": 1001, + "first_name": "Sally", + "last_name": "Thomas", + "email": "sally.thomas@acme.com", + "boolean_test": true, + "bool_test": false, + "tinyint_test": 1, + "smallint_test": 2, + "mediumint_test": 3, + "int_test": 4, + "integer_test": 5, + "int_x_test": 7, + "big_int_test": 9223372036854775806 + }, + "source": { + "version": "2.0.1.Final", + "connector": "mysql", + "name": "mysql1", + "ts_ms": 1678735164000, + "snapshot": "false", + "db": "inventory", + "sequence": null, + "table": "customers", + "server_id": 223344, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 3723, + "row": 0, + "thread": 12, + "query": null + }, + "op": "u", + "ts_ms": 1678735164638, + "transaction": null + } +}` + evt, err := m.Debezium.GetEventFromBytes(context.Background(), []byte(payload)) + assert.NoError(m.T(), err) + assert.Equal(m.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime()) + + evtData := evt.GetData(context.Background(), "id", 1001, &kafkalib.TopicConfig{}) + assert.Equal(m.T(), evtData["id"], 1001) + assert.Equal(m.T(), evtData["first_name"], "Sally") + assert.Equal(m.T(), evtData["bool_test"], false) + +} diff --git a/lib/cdc/postgres/debezium.go b/lib/cdc/postgres/debezium.go index a63871d0e..6b01fb690 100644 --- a/lib/cdc/postgres/debezium.go +++ b/lib/cdc/postgres/debezium.go @@ -4,21 +4,16 @@ import ( "context" "encoding/json" "fmt" - "strconv" - "time" - "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/cdc/util" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/logger" ) type Debezium string func (d *Debezium) GetEventFromBytes(ctx context.Context, bytes []byte) (cdc.Event, error) { - var event SchemaEventPayload + var event util.SchemaEventPayload if len(bytes) == 0 { // This is a Kafka Tombstone event. return &event, nil @@ -48,73 +43,3 @@ func (d *Debezium) GetPrimaryKey(ctx context.Context, key []byte, tc *kafkalib.T return } - -func (s *SchemaEventPayload) GetExecutionTime() time.Time { - return time.UnixMilli(s.Payload.Source.TsMs).UTC() -} - -func (s *SchemaEventPayload) GetData(ctx context.Context, pkName string, pkVal interface{}, tc *kafkalib.TopicConfig) map[string]interface{} { - retMap := make(map[string]interface{}) - if len(s.Payload.After) == 0 { - // This is a delete payload, so mark it as deleted. - // And we need to reconstruct the data bit since it will be empty. - // We _can_ rely on *before* since even without running replicate identity, it will still copy over - // the PK. We can explore simplifying this interface in the future by leveraging before. - retMap = map[string]interface{}{ - constants.DeleteColumnMarker: true, - pkName: pkVal, - } - - // If idempotency key is an empty string, don't put it in the payload data - if tc.IdempotentKey != "" { - retMap[tc.IdempotentKey] = s.GetExecutionTime().Format(time.RFC3339) - } - } else { - retMap = s.Payload.After - retMap[constants.DeleteColumnMarker] = false - } - - // Iterate over the schema and identify if there are any fields that require extra care. - afterSchemaObject := s.Schema.GetSchemaFromLabel(cdc.After) - if afterSchemaObject != nil { - for _, field := range afterSchemaObject.Fields { - // Check if the field is an integer and requires us to cast it as such. - if field.IsInteger() { - valFloat, isOk := retMap[field.FieldName].(float64) - if isOk { - retMap[field.FieldName] = int(valFloat) - continue - } - } - - if valid, supportedType := debezium.RequiresSpecialTypeCasting(field.DebeziumType); valid { - val, isOk := retMap[field.FieldName] - if isOk { - // Need to cast this as a FLOAT first because the number may come out in scientific notation - // ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288 - floatVal, castErr := strconv.ParseFloat(fmt.Sprint(val), 64) - if castErr == nil { - extendedTime, err := debezium.FromDebeziumTypeToTime(supportedType, int64(floatVal)) - if err == nil { - retMap[field.FieldName] = extendedTime - } else { - logger.FromContext(ctx).WithFields(map[string]interface{}{ - "err": err, - "supportedType": supportedType, - "val": val, - }).Debug("skipped casting dbz type due to an error") - } - } else { - logger.FromContext(ctx).WithFields(map[string]interface{}{ - "err": castErr, - "supportedType": supportedType, - "val": val, - }).Debug("skipped casting because we failed to parse the float") - } - } - } - } - } - - return retMap -} diff --git a/lib/cdc/postgres/debezium_test.go b/lib/cdc/postgres/debezium_test.go index e77203648..93a996a73 100644 --- a/lib/cdc/postgres/debezium_test.go +++ b/lib/cdc/postgres/debezium_test.go @@ -3,7 +3,6 @@ package postgres import ( "context" "fmt" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -33,117 +32,6 @@ func (p *PostgresTestSuite) TestGetPrimaryKeyUUID() { assert.Equal(p.T(), err, nil) } -func (p *PostgresTestSuite) TestSource_GetExecutionTime() { - source := Source{ - Connector: "postgresql", - TsMs: 1665458364942, // Tue Oct 11 2022 03:19:24 - } - - schemaEventPayload := &SchemaEventPayload{ - Payload: payload{Source: source}, - } - - assert.Equal(p.T(), time.Date(2022, time.October, - 11, 3, 19, 24, 942000000, time.UTC), schemaEventPayload.GetExecutionTime()) -} - -func (p *PostgresTestSuite) TestGetDataTestInsert() { - after := map[string]interface{}{ - "pk": 1, - "foo": "bar", - "name": "dusty", - "favoriteFood": "jerky", - } - - var tc kafkalib.TopicConfig - schemaEventPayload := SchemaEventPayload{ - Payload: payload{ - Before: nil, - After: after, - Operation: "c", - }, - } - - evtData := schemaEventPayload.GetData(context.Background(), "pk", 1, &tc) - assert.Equal(p.T(), len(after), len(evtData), "has deletion flag") - - deletionFlag, isOk := evtData[constants.DeleteColumnMarker] - assert.True(p.T(), isOk) - assert.False(p.T(), deletionFlag.(bool)) - - delete(evtData, constants.DeleteColumnMarker) - assert.Equal(p.T(), after, evtData) -} - -func (p *PostgresTestSuite) TestGetDataTestDelete() { - tc := &kafkalib.TopicConfig{ - IdempotentKey: "updated_at", - } - - now := time.Now().UTC() - schemaEventPayload := SchemaEventPayload{ - Payload: payload{ - Before: nil, - After: nil, - Operation: "c", - Source: Source{TsMs: now.UnixMilli()}, - }, - } - - evtData := schemaEventPayload.GetData(context.Background(), "pk", 1, tc) - shouldDelete, isOk := evtData[constants.DeleteColumnMarker] - assert.True(p.T(), isOk) - assert.True(p.T(), shouldDelete.(bool)) - - assert.Equal(p.T(), 3, len(evtData), evtData) - assert.Equal(p.T(), evtData["pk"], 1) - assert.Equal(p.T(), evtData[tc.IdempotentKey], now.Format(time.RFC3339)) - - tc.IdempotentKey = "" - evtData = schemaEventPayload.GetData(context.Background(), "pk", 1, tc) - _, isOk = evtData[tc.IdempotentKey] - assert.False(p.T(), isOk, evtData) -} - -func (p *PostgresTestSuite) TestGetDataTestUpdate() { - before := map[string]interface{}{ - "pk": 1, - "foo": "bar", - "name": "dusty", - "favoriteFood": "apples", - "age": 1, - "weight_lbs": 25, - } - - after := map[string]interface{}{ - "pk": 1, - "foo": "bar", - "name": "dusty", - "favoriteFood": "jerky", - "age": 2, - "weight_lbs": 33, - } - - var tc kafkalib.TopicConfig - schemaEventPayload := SchemaEventPayload{ - Payload: payload{ - Before: before, - After: after, - Operation: "c", - }, - } - - evtData := schemaEventPayload.GetData(context.Background(), "pk", 1, &tc) - assert.Equal(p.T(), len(after), len(evtData), "has deletion flag") - - deletionFlag, isOk := evtData[constants.DeleteColumnMarker] - assert.True(p.T(), isOk) - assert.False(p.T(), deletionFlag.(bool)) - - delete(evtData, constants.DeleteColumnMarker) - assert.Equal(p.T(), after, evtData) -} - func (p *PostgresTestSuite) TestPostgresEvent() { payload := ` { diff --git a/lib/cdc/postgres/event.go b/lib/cdc/postgres/event.go deleted file mode 100644 index 0c65bb6cc..000000000 --- a/lib/cdc/postgres/event.go +++ /dev/null @@ -1,24 +0,0 @@ -package postgres - -import "github.com/artie-labs/transfer/lib/debezium" - -// SchemaEventPayload is our struct for an event with schema enabled. For reference, this is an example payload https://gist.github.com/Tang8330/3b9989ed8c659771958fe481f248397a -type SchemaEventPayload struct { - Schema debezium.Schema `json:"schema"` - Payload payload `json:"payload"` -} - -type payload struct { - Before map[string]interface{} `json:"before"` - After map[string]interface{} `json:"after"` - Source Source `json:"source"` - Operation string `json:"op"` -} - -type Source struct { - Connector string `json:"connector"` - TsMs int64 `json:"ts_ms"` - Database string `json:"db"` - Schema string `json:"schema"` - Table string `json:"table"` -} diff --git a/lib/cdc/util/relational_event.go b/lib/cdc/util/relational_event.go new file mode 100644 index 000000000..5d84296b1 --- /dev/null +++ b/lib/cdc/util/relational_event.go @@ -0,0 +1,104 @@ +package util + +import ( + "context" + "fmt" + "github.com/artie-labs/transfer/lib/cdc" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/debezium" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/logger" + "strconv" + "time" +) + +// SchemaEventPayload is our struct for an event with schema enabled. For reference, this is an example payload https://gist.github.com/Tang8330/3b9989ed8c659771958fe481f248397a +type SchemaEventPayload struct { + Schema debezium.Schema `json:"schema"` + Payload payload `json:"payload"` +} + +type payload struct { + Before map[string]interface{} `json:"before"` + After map[string]interface{} `json:"after"` + Source Source `json:"source"` + Operation string `json:"op"` +} + +type Source struct { + Connector string `json:"connector"` + TsMs int64 `json:"ts_ms"` + Database string `json:"db"` + Schema string `json:"schema"` + Table string `json:"table"` +} + +func (s *SchemaEventPayload) GetExecutionTime() time.Time { + return time.UnixMilli(s.Payload.Source.TsMs).UTC() +} + +func (s *SchemaEventPayload) GetData(ctx context.Context, pkName string, pkVal interface{}, tc *kafkalib.TopicConfig) map[string]interface{} { + retMap := make(map[string]interface{}) + if len(s.Payload.After) == 0 { + // This is a delete payload, so mark it as deleted. + // And we need to reconstruct the data bit since it will be empty. + // We _can_ rely on *before* since even without running replicate identity, it will still copy over + // the PK. We can explore simplifying this interface in the future by leveraging before. + retMap = map[string]interface{}{ + constants.DeleteColumnMarker: true, + pkName: pkVal, + } + + // If idempotency key is an empty string, don't put it in the payload data + if tc.IdempotentKey != "" { + retMap[tc.IdempotentKey] = s.GetExecutionTime().Format(time.RFC3339) + } + } else { + retMap = s.Payload.After + retMap[constants.DeleteColumnMarker] = false + } + + // Iterate over the schema and identify if there are any fields that require extra care. + afterSchemaObject := s.Schema.GetSchemaFromLabel(cdc.After) + if afterSchemaObject != nil { + for _, field := range afterSchemaObject.Fields { + // Check if the field is an integer and requires us to cast it as such. + if field.IsInteger() { + valFloat, isOk := retMap[field.FieldName].(float64) + if isOk { + retMap[field.FieldName] = int(valFloat) + continue + } + } + + if valid, supportedType := debezium.RequiresSpecialTypeCasting(field.DebeziumType); valid { + val, isOk := retMap[field.FieldName] + if isOk { + // Need to cast this as a FLOAT first because the number may come out in scientific notation + // ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288 + floatVal, castErr := strconv.ParseFloat(fmt.Sprint(val), 64) + if castErr == nil { + extendedTime, err := debezium.FromDebeziumTypeToTime(supportedType, int64(floatVal)) + if err == nil { + retMap[field.FieldName] = extendedTime + } else { + logger.FromContext(ctx).WithFields(map[string]interface{}{ + "err": err, + "supportedType": supportedType, + "val": val, + }).Debug("skipped casting dbz type due to an error") + } + } else { + logger.FromContext(ctx).WithFields(map[string]interface{}{ + "err": castErr, + "supportedType": supportedType, + "val": val, + }).Debug("skipped casting because we failed to parse the float") + } + } + } + } + } + + return retMap +} diff --git a/lib/cdc/util/relational_event_test.go b/lib/cdc/util/relational_event_test.go new file mode 100644 index 000000000..a036dac27 --- /dev/null +++ b/lib/cdc/util/relational_event_test.go @@ -0,0 +1,121 @@ +package util + +import ( + "context" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestSource_GetExecutionTime(t *testing.T) { + source := Source{ + Connector: "postgresql", + TsMs: 1665458364942, // Tue Oct 11 2022 03:19:24 + } + + schemaEventPayload := &SchemaEventPayload{ + Payload: payload{Source: source}, + } + + assert.Equal(t, time.Date(2022, time.October, + 11, 3, 19, 24, 942000000, time.UTC), schemaEventPayload.GetExecutionTime()) +} + +func TestGetDataTestInsert(t *testing.T) { + after := map[string]interface{}{ + "pk": 1, + "foo": "bar", + "name": "dusty", + "favoriteFood": "jerky", + } + + var tc kafkalib.TopicConfig + schemaEventPayload := SchemaEventPayload{ + Payload: payload{ + Before: nil, + After: after, + Operation: "c", + }, + } + + evtData := schemaEventPayload.GetData(context.Background(), "pk", 1, &tc) + assert.Equal(t, len(after), len(evtData), "has deletion flag") + + deletionFlag, isOk := evtData[constants.DeleteColumnMarker] + assert.True(t, isOk) + assert.False(t, deletionFlag.(bool)) + + delete(evtData, constants.DeleteColumnMarker) + assert.Equal(t, after, evtData) +} + +func TestGetDataTestDelete(t *testing.T) { + tc := &kafkalib.TopicConfig{ + IdempotentKey: "updated_at", + } + + now := time.Now().UTC() + schemaEventPayload := SchemaEventPayload{ + Payload: payload{ + Before: nil, + After: nil, + Operation: "c", + Source: Source{TsMs: now.UnixMilli()}, + }, + } + + evtData := schemaEventPayload.GetData(context.Background(), "pk", 1, tc) + shouldDelete, isOk := evtData[constants.DeleteColumnMarker] + assert.True(t, isOk) + assert.True(t, shouldDelete.(bool)) + + assert.Equal(t, 3, len(evtData), evtData) + assert.Equal(t, evtData["pk"], 1) + assert.Equal(t, evtData[tc.IdempotentKey], now.Format(time.RFC3339)) + + tc.IdempotentKey = "" + evtData = schemaEventPayload.GetData(context.Background(), "pk", 1, tc) + _, isOk = evtData[tc.IdempotentKey] + assert.False(t, isOk, evtData) +} + +func TestGetDataTestUpdate(t *testing.T) { + before := map[string]interface{}{ + "pk": 1, + "foo": "bar", + "name": "dusty", + "favoriteFood": "apples", + "age": 1, + "weight_lbs": 25, + } + + after := map[string]interface{}{ + "pk": 1, + "foo": "bar", + "name": "dusty", + "favoriteFood": "jerky", + "age": 2, + "weight_lbs": 33, + } + + var tc kafkalib.TopicConfig + schemaEventPayload := SchemaEventPayload{ + Payload: payload{ + Before: before, + After: after, + Operation: "c", + }, + } + + evtData := schemaEventPayload.GetData(context.Background(), "pk", 1, &tc) + assert.Equal(t, len(after), len(evtData), "has deletion flag") + + deletionFlag, isOk := evtData[constants.DeleteColumnMarker] + assert.True(t, isOk) + assert.False(t, deletionFlag.(bool)) + + delete(evtData, constants.DeleteColumnMarker) + assert.Equal(t, after, evtData) +} diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index ff96daf7d..914270700 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -16,6 +16,7 @@ const ( DBZPostgresFormat = "debezium.postgres" DBZPostgresAltFormat = "debezium.postgres.wal2json" DBZMongoFormat = "debezium.mongodb" + DBZMySQLFormat = "debezium.mysql" ) // ExporterKind is used for the Telemetry package diff --git a/models/memory.go b/models/memory.go index 5ad9ae948..92196d94a 100644 --- a/models/memory.go +++ b/models/memory.go @@ -104,7 +104,7 @@ func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) ( inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.Invalid continue } - + colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[col] if !isOk { inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.ParseValue(val)