Skip to content

Commit

Permalink
[mysql,postgres] Change Debezium conversion for timestamp types
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 27, 2024
1 parent f4001b4 commit 26c4de6
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 20 deletions.
8 changes: 4 additions & 4 deletions integration_tests/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,15 @@ const expectedPayloadTemplate = `{
"optional": false,
"default": null,
"field": "c_datetime",
"name": "io.debezium.time.Timestamp",
"name": "io.debezium.time.ZonedTimestamp",
"parameters": null
},
{
"type": "string",
"type": "int64",
"optional": false,
"default": null,
"field": "c_timestamp",
"name": "io.debezium.time.Timestamp",
"name": "io.debezium.time.MicroTimestamp",
"parameters": null
},
{
Expand Down Expand Up @@ -422,7 +422,7 @@ const expectedPayloadTemplate = `{
"c_smallint": 2,
"c_text": "ZXCV",
"c_time": 14706000000,
"c_timestamp": "2001-02-03T04:05:06Z",
"c_timestamp": 981173106000000,
"c_tinyint": 1,
"c_varbinary": "Qk5N",
"c_varchar": "GHJKL",
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,19 +516,19 @@ const expectedPayloadTemplate = `{
"parameters": null
},
{
"type": "string",
"type": "int64",
"optional": false,
"default": null,
"field": "c_timestamp_without_timezone",
"name": "io.debezium.time.Timestamp",
"name": "io.debezium.time.MicroTimestamp",
"parameters": null
},
{
"type": "string",
"optional": false,
"default": null,
"field": "c_timestamp_with_timezone",
"name": "io.debezium.time.Timestamp",
"name": "io.debezium.time.ZonedTimestamp",
"parameters": null
},
{
Expand Down Expand Up @@ -696,7 +696,7 @@ const expectedPayloadTemplate = `{
"c_time_with_timezone": 38057000,
"c_time_without_timezone": 45296000,
"c_timestamp_with_timezone": "2001-02-16T13:38:40Z",
"c_timestamp_without_timezone": "2001-02-16T20:38:40Z",
"c_timestamp_without_timezone": 982355920000000,
"c_tsrange": "[\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")",
"c_tstzrange": "[\"2001-02-16 08:38:40+00\",\"2001-03-20 08:38:40+00\")",
"c_uuid": "e7082e96-7190-4cc3-8ab4-bd27f1269f08",
Expand Down
31 changes: 25 additions & 6 deletions lib/debezium/converters/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,37 @@ func (DateConverter) Convert(value any) (any, error) {
return int32(timeValue.Unix() / (60 * 60 * 24)), nil
}

type TimestampConverter struct{}
type MicroTimestampConverter struct{}

func (TimestampConverter) ToField(name string) debezium.Field {
func (MicroTimestampConverter) ToField(name string) debezium.Field {
// Represents the number of microseconds since the epoch, and does not include timezone information.
return debezium.Field{
FieldName: name,
// NOTE: We are returning string here because we want the right layout to be used by our Typing library
FieldName: name,
Type: "int64",
DebeziumType: string(debezium.MicroTimestamp),
}
}

func (MicroTimestampConverter) Convert(value any) (any, error) {
timeValue, ok := value.(time.Time)
if !ok {
return nil, fmt.Errorf("expected time.Time got %T with value: %v", value, value)
}
return timeValue.UnixMicro(), nil
}

type ZonedTimestampConverter struct{}

func (ZonedTimestampConverter) ToField(name string) debezium.Field {
// A string representation of a timestamp with timezone information, where the timezone is GMT.
return debezium.Field{
FieldName: name,
Type: "string",
DebeziumType: string(debezium.Timestamp),
DebeziumType: string(debezium.DateTimeWithTimezone),
}
}

func (TimestampConverter) Convert(value any) (any, error) {
func (ZonedTimestampConverter) Convert(value any) (any, error) {
timeValue, ok := value.(time.Time)
if !ok {
return nil, fmt.Errorf("expected time.Time got %T with value: %v", value, value)
Expand Down
31 changes: 29 additions & 2 deletions lib/debezium/converters/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,35 @@ func TestDateConverter_Convert(t *testing.T) {
}
}

func TestTimestampConverter_Convert(t *testing.T) {
converter := TimestampConverter{}
func TestMicroTimestampConverter_Convert(t *testing.T) {
converter := MicroTimestampConverter{}
{
// Invalid type
_, err := converter.Convert(1234)
assert.ErrorContains(t, err, "expected time.Time got int with value: 1234")
}
{
// Date > 9999
value, err := converter.Convert(time.Date(9_9999, 2, 3, 4, 5, 0, 0, time.UTC))
assert.NoError(t, err)
assert.Equal(t, value, int64(3093499310700000000))
}
{
// Date < 0
value, err := converter.Convert(time.Date(-1, 2, 3, 4, 5, 0, 0, time.UTC))
assert.NoError(t, err)
assert.Equal(t, int64(-62195889300000000), value)
}
{
// time.Time
value, err := converter.Convert(time.Date(2001, 2, 3, 4, 5, 0, 0, time.UTC))
assert.NoError(t, err)
assert.Equal(t, int64(981173100000000), value)
}
}

func TestZonedTimestampConverter_Convert(t *testing.T) {
converter := ZonedTimestampConverter{}
{
// Invalid type
_, err := converter.Convert(1234)
Expand Down
2 changes: 2 additions & 0 deletions lib/postgres/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var supportedPrimaryKeyDataType []schema.DataType = []schema.DataType{
schema.Time,
schema.Date,
schema.Timestamp,
schema.TimestampWithTimeZone,
schema.Interval,
schema.UUID,
schema.JSON,
Expand Down Expand Up @@ -127,6 +128,7 @@ func (s scanAdapter) ParsePrimaryKeyValue(columnName string, value string) (any,
schema.Time,
schema.Date,
schema.Timestamp,
schema.TimestampWithTimeZone,
schema.Interval,
schema.UUID,
schema.JSON:
Expand Down
5 changes: 4 additions & 1 deletion lib/postgres/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
TimeWithTimeZone
Date
Timestamp
TimestampWithTimeZone
Interval
UUID
Array
Expand Down Expand Up @@ -119,8 +120,10 @@ func ParseColumnDataType(colKind string, precision, scale *int, udtName *string)
return TimeWithTimeZone, nil, nil
case "date":
return Date, nil, nil
case "timestamp without time zone", "timestamp with time zone":
case "timestamp without time zone":
return Timestamp, nil, nil
case "timestamp with time zone":
return TimestampWithTimeZone, nil, nil
case "interval":
return Interval, nil, nil
case "uuid":
Expand Down
6 changes: 4 additions & 2 deletions sources/mysql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.Val
return converters.MicroTimeConverter{}, nil
case schema.Date:
return converters.DateConverter{}, nil
case schema.DateTime, schema.Timestamp:
return converters.TimestampConverter{}, nil
case schema.DateTime:
return converters.ZonedTimestampConverter{}, nil
case schema.Timestamp:
return converters.MicroTimestampConverter{}, nil
case schema.Year:
return converters.YearConverter{}, nil
case schema.Enum:
Expand Down
4 changes: 3 additions & 1 deletion sources/postgres/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func valueConverterForType(dataType schema.DataType, opts *schema.Opts) (convert
case schema.Date:
return converters.DateConverter{}, nil
case schema.Timestamp:
return converters.TimestampConverter{}, nil
return converters.MicroTimestampConverter{}, nil
case schema.TimestampWithTimeZone:
return converters.ZonedTimestampConverter{}, nil
case schema.Interval:
return PgIntervalConverter{}, nil
case schema.UUID:
Expand Down

0 comments on commit 26c4de6

Please sign in to comment.