Skip to content

Commit

Permalink
Fix MySQL snapshot error (#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 5, 2024
1 parent ce125be commit 0dad82e
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 41 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.27.27
github.com/artie-labs/transfer v1.27.28
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.27.27 h1:E0EmZdwcCR9jhxELa85tzlMSn6OAPolpsbohwK2aRb0=
github.com/artie-labs/transfer v1.27.27/go.mod h1:WEUeVNNyufZIzBJsqVAXC7FkQDvSeAWJS7lgtMNLiS8=
github.com/artie-labs/transfer v1.27.28 h1:jga4T/aq+l4WCmq08etz5T+P6aeYBB4XFE6a3/II4ss=
github.com/artie-labs/transfer v1.27.28/go.mod h1:WEUeVNNyufZIzBJsqVAXC7FkQDvSeAWJS7lgtMNLiS8=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
Expand Down
66 changes: 39 additions & 27 deletions integration_tests/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const testTypesCreateTableQuery = `
CREATE TABLE %s (
pk INTEGER PRIMARY KEY NOT NULL,
c_tinyint TINYINT,
c_tinyint_edgecase TINYINT(1),
c_tinyint_unsigned TINYINT UNSIGNED,
c_smallint SMALLINT,
c_smallint_unsigned SMALLINT UNSIGNED,
Expand Down Expand Up @@ -124,6 +125,8 @@ INSERT INTO %s VALUES (
1,
-- c_tinyint
1,
-- c_tinyint_edgecase
127,
-- c_smallint_unsigned
2,
-- c_smallint
Expand Down Expand Up @@ -224,6 +227,14 @@ const expectedPayloadTemplate = `{
"name": "",
"parameters": null
},
{
"type": "int16",
"optional": false,
"default": null,
"field": "c_tinyint_edgecase",
"name": "",
"parameters": null
},
{
"type": "int16",
"optional": false,
Expand Down Expand Up @@ -335,7 +346,7 @@ const expectedPayloadTemplate = `{
"parameters": null
},
{
"type": "boolean",
"type": "int16",
"optional": false,
"default": null,
"field": "c_boolean",
Expand Down Expand Up @@ -539,7 +550,7 @@ const expectedPayloadTemplate = `{
"c_binary": "QVNERgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"c_bit": true,
"c_blob": "UVdFUg==",
"c_boolean": false,
"c_boolean": 0,
"c_char": "X",
"c_date": 18263,
"c_date_0000_00_00": null,
Expand Down Expand Up @@ -593,6 +604,7 @@ const expectedPayloadTemplate = `{
"c_time": 14706000000,
"c_timestamp": "2001-02-03T04:05:06Z",
"c_tinyint": 1,
"c_tinyint_edgecase": 127,
"c_tinyint_unsigned": 2,
"c_varbinary": "Qk5N",
"c_varchar": "GHJKL",
Expand Down Expand Up @@ -706,31 +718,31 @@ func testScan(db *sql.DB, dbName string) error {
}

expectedPartitionKeys := []map[string]any{
{"c_int_pk": int64(4), "c_boolean_pk": false, "c_text_pk": "jn"},
{"c_int_pk": int64(5), "c_boolean_pk": false, "c_text_pk": "rn"},
{"c_int_pk": int64(7), "c_boolean_pk": true, "c_text_pk": "rr"},
{"c_int_pk": int64(13), "c_boolean_pk": true, "c_text_pk": "rd"},
{"c_int_pk": int64(15), "c_boolean_pk": false, "c_text_pk": "jr"},
{"c_int_pk": int64(26), "c_boolean_pk": false, "c_text_pk": "dr"},
{"c_int_pk": int64(27), "c_boolean_pk": false, "c_text_pk": "jr"},
{"c_int_pk": int64(29), "c_boolean_pk": false, "c_text_pk": "nr"},
{"c_int_pk": int64(29), "c_boolean_pk": false, "c_text_pk": "rj"},
{"c_int_pk": int64(35), "c_boolean_pk": false, "c_text_pk": "dr"},
{"c_int_pk": int64(41), "c_boolean_pk": true, "c_text_pk": "nr"},
{"c_int_pk": int64(45), "c_boolean_pk": true, "c_text_pk": "nr"},
{"c_int_pk": int64(46), "c_boolean_pk": false, "c_text_pk": "dj"},
{"c_int_pk": int64(54), "c_boolean_pk": true, "c_text_pk": "rd"},
{"c_int_pk": int64(57), "c_boolean_pk": false, "c_text_pk": "nj"},
{"c_int_pk": int64(60), "c_boolean_pk": true, "c_text_pk": "rj"},
{"c_int_pk": int64(62), "c_boolean_pk": false, "c_text_pk": "nn"},
{"c_int_pk": int64(73), "c_boolean_pk": false, "c_text_pk": "dr"},
{"c_int_pk": int64(86), "c_boolean_pk": false, "c_text_pk": "rn"},
{"c_int_pk": int64(87), "c_boolean_pk": false, "c_text_pk": "nr"},
{"c_int_pk": int64(88), "c_boolean_pk": false, "c_text_pk": "rr"},
{"c_int_pk": int64(88), "c_boolean_pk": true, "c_text_pk": "rj"},
{"c_int_pk": int64(89), "c_boolean_pk": true, "c_text_pk": "dn"},
{"c_int_pk": int64(91), "c_boolean_pk": false, "c_text_pk": "nj"},
{"c_int_pk": int64(94), "c_boolean_pk": false, "c_text_pk": "dn"},
{"c_int_pk": int64(4), "c_boolean_pk": int64(0), "c_text_pk": "jn"},
{"c_int_pk": int64(5), "c_boolean_pk": int64(0), "c_text_pk": "rn"},
{"c_int_pk": int64(7), "c_boolean_pk": int64(1), "c_text_pk": "rr"},
{"c_int_pk": int64(13), "c_boolean_pk": int64(1), "c_text_pk": "rd"},
{"c_int_pk": int64(15), "c_boolean_pk": int64(0), "c_text_pk": "jr"},
{"c_int_pk": int64(26), "c_boolean_pk": int64(0), "c_text_pk": "dr"},
{"c_int_pk": int64(27), "c_boolean_pk": int64(0), "c_text_pk": "jr"},
{"c_int_pk": int64(29), "c_boolean_pk": int64(0), "c_text_pk": "nr"},
{"c_int_pk": int64(29), "c_boolean_pk": int64(0), "c_text_pk": "rj"},
{"c_int_pk": int64(35), "c_boolean_pk": int64(0), "c_text_pk": "dr"},
{"c_int_pk": int64(41), "c_boolean_pk": int64(1), "c_text_pk": "nr"},
{"c_int_pk": int64(45), "c_boolean_pk": int64(1), "c_text_pk": "nr"},
{"c_int_pk": int64(46), "c_boolean_pk": int64(0), "c_text_pk": "dj"},
{"c_int_pk": int64(54), "c_boolean_pk": int64(1), "c_text_pk": "rd"},
{"c_int_pk": int64(57), "c_boolean_pk": int64(0), "c_text_pk": "nj"},
{"c_int_pk": int64(60), "c_boolean_pk": int64(1), "c_text_pk": "rj"},
{"c_int_pk": int64(62), "c_boolean_pk": int64(0), "c_text_pk": "nn"},
{"c_int_pk": int64(73), "c_boolean_pk": int64(0), "c_text_pk": "dr"},
{"c_int_pk": int64(86), "c_boolean_pk": int64(0), "c_text_pk": "rn"},
{"c_int_pk": int64(87), "c_boolean_pk": int64(0), "c_text_pk": "nr"},
{"c_int_pk": int64(88), "c_boolean_pk": int64(0), "c_text_pk": "rr"},
{"c_int_pk": int64(88), "c_boolean_pk": int64(1), "c_text_pk": "rj"},
{"c_int_pk": int64(89), "c_boolean_pk": int64(1), "c_text_pk": "dn"},
{"c_int_pk": int64(91), "c_boolean_pk": int64(0), "c_text_pk": "nj"},
{"c_int_pk": int64(94), "c_boolean_pk": int64(0), "c_text_pk": "dn"},
}
expectedValues := []string{
"row 3",
Expand Down
20 changes: 17 additions & 3 deletions lib/debezium/converters/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package converters
import (
"fmt"
"log/slog"
"strconv"
"strings"
"time"

"github.com/artie-labs/transfer/lib/debezium"
Expand Down Expand Up @@ -91,10 +93,22 @@ func (DateConverter) Convert(value any) (any, error) {
case time.Time:
timeValue = castValue
case string:
if castValue == "0000-00-00" {
// MySQL supports '0000-00-00' for date columns
return nil, nil
parts := strings.Split(castValue, "-")
if len(parts) == 3 {
for _, part := range parts {
castedPart, err := strconv.ParseInt(part, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse date %q: %w", castValue, err)
}

if castedPart <= 0 {
slog.Warn(fmt.Sprintf("Skipping invalid value: %q", castValue))
// MySQL supports '0000-00-00' for date columns if strict mode is not enabled.
return nil, nil
}
}
}

var err error
timeValue, err = time.Parse(time.DateOnly, castValue)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion lib/debezium/converters/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func TestDateConverter_Convert(t *testing.T) {
_, err := converter.Convert(12345)
assert.ErrorContains(t, err, "expected string/time.Time got int with value: 12345")
}
{
// string - 00 month
value, err := converter.Convert("2024-00-08")
assert.NoError(t, err)
assert.Equal(t, nil, value)
}
{
// string - 0000-00-00
value, err := converter.Convert("0000-00-00")
Expand All @@ -123,7 +129,7 @@ func TestDateConverter_Convert(t *testing.T) {
{
// string - malformed
_, err := converter.Convert("aaaa-bb-cc")
assert.ErrorContains(t, err, `failed to convert to date: parsing time "aaaa-bb-cc" as "2006-01-02"`)
assert.ErrorContains(t, err, `failed to parse date "aaaa-bb-cc": strconv.ParseInt: parsing "aaaa": invalid syntax`)
}
{
// string - 2023-05-03
Expand Down
5 changes: 0 additions & 5 deletions lib/mysql/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ func parseColumnDataType(originalS string) (DataType, *Opts, error) {

switch s {
case "tinyint":
// Boolean, bool are aliases for tinyint(1)
if metadata == "1" {
return Boolean, nil, nil
}

if unsigned {
return SmallInt, nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions lib/mysql/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ func TestParseColumnDataType(t *testing.T) {
}
}
{
// tinyint(1) or boolean
// tinyint(1) should still be an integer
dataType, _, err := parseColumnDataType("tinyint(1)")
assert.NoError(t, err)
assert.Equal(t, Boolean, dataType)
assert.Equal(t, TinyInt, dataType)
}
{
// String
Expand Down

0 comments on commit 0dad82e

Please sign in to comment.