From 26c4de6f02701a54aabd1df75fc463bbe2c72a74 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Wed, 27 Mar 2024 15:35:12 -0700 Subject: [PATCH] [mysql,postgres] Change Debezium conversion for timestamp types --- integration_tests/mysql/main.go | 8 +++---- integration_tests/postgres/main.go | 8 +++---- lib/debezium/converters/time.go | 31 ++++++++++++++++++++++------ lib/debezium/converters/time_test.go | 31 ++++++++++++++++++++++++++-- lib/postgres/scanner.go | 2 ++ lib/postgres/schema/schema.go | 5 ++++- sources/mysql/adapter/adapter.go | 6 ++++-- sources/postgres/adapter/adapter.go | 4 +++- 8 files changed, 75 insertions(+), 20 deletions(-) diff --git a/integration_tests/mysql/main.go b/integration_tests/mysql/main.go index 3e4b3fec..ac671c55 100644 --- a/integration_tests/mysql/main.go +++ b/integration_tests/mysql/main.go @@ -293,15 +293,15 @@ const expectedPayloadTemplate = `{ "optional": false, "default": null, "field": "c_datetime", - "name": "io.debezium.time.Timestamp", + "name": "io.debezium.time.ZonedTimestamp", "parameters": null }, { - "type": "string", + "type": "int64", "optional": false, "default": null, "field": "c_timestamp", - "name": "io.debezium.time.Timestamp", + "name": "io.debezium.time.MicroTimestamp", "parameters": null }, { @@ -422,7 +422,7 @@ const expectedPayloadTemplate = `{ "c_smallint": 2, "c_text": "ZXCV", "c_time": 14706000000, - "c_timestamp": "2001-02-03T04:05:06Z", + "c_timestamp": 981173106000000, "c_tinyint": 1, "c_varbinary": "Qk5N", "c_varchar": "GHJKL", diff --git a/integration_tests/postgres/main.go b/integration_tests/postgres/main.go index aea3bd74..c2144c33 100644 --- a/integration_tests/postgres/main.go +++ b/integration_tests/postgres/main.go @@ -516,11 +516,11 @@ const expectedPayloadTemplate = `{ "parameters": null }, { - "type": "string", + "type": "int64", "optional": false, "default": null, "field": "c_timestamp_without_timezone", - "name": "io.debezium.time.Timestamp", + "name": "io.debezium.time.MicroTimestamp", "parameters": null }, { @@ -528,7 +528,7 @@ const expectedPayloadTemplate = `{ "optional": false, "default": null, "field": "c_timestamp_with_timezone", - "name": "io.debezium.time.Timestamp", + "name": "io.debezium.time.ZonedTimestamp", "parameters": null }, { @@ -696,7 +696,7 @@ const expectedPayloadTemplate = `{ "c_time_with_timezone": 38057000, "c_time_without_timezone": 45296000, "c_timestamp_with_timezone": "2001-02-16T13:38:40Z", - "c_timestamp_without_timezone": "2001-02-16T20:38:40Z", + "c_timestamp_without_timezone": 982355920000000, "c_tsrange": "[\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", "c_tstzrange": "[\"2001-02-16 08:38:40+00\",\"2001-03-20 08:38:40+00\")", "c_uuid": "e7082e96-7190-4cc3-8ab4-bd27f1269f08", diff --git a/lib/debezium/converters/time.go b/lib/debezium/converters/time.go index 62cd761e..9debbe69 100644 --- a/lib/debezium/converters/time.go +++ b/lib/debezium/converters/time.go @@ -67,18 +67,37 @@ func (DateConverter) Convert(value any) (any, error) { return int32(timeValue.Unix() / (60 * 60 * 24)), nil } -type TimestampConverter struct{} +type MicroTimestampConverter struct{} -func (TimestampConverter) ToField(name string) debezium.Field { +func (MicroTimestampConverter) ToField(name string) debezium.Field { + // Represents the number of microseconds since the epoch, and does not include timezone information. return debezium.Field{ - FieldName: name, - // NOTE: We are returning string here because we want the right layout to be used by our Typing library + FieldName: name, + Type: "int64", + DebeziumType: string(debezium.MicroTimestamp), + } +} + +func (MicroTimestampConverter) Convert(value any) (any, error) { + timeValue, ok := value.(time.Time) + if !ok { + return nil, fmt.Errorf("expected time.Time got %T with value: %v", value, value) + } + return timeValue.UnixMicro(), nil +} + +type ZonedTimestampConverter struct{} + +func (ZonedTimestampConverter) ToField(name string) debezium.Field { + // A string representation of a timestamp with timezone information, where the timezone is GMT. + return debezium.Field{ + FieldName: name, Type: "string", - DebeziumType: string(debezium.Timestamp), + DebeziumType: string(debezium.DateTimeWithTimezone), } } -func (TimestampConverter) Convert(value any) (any, error) { +func (ZonedTimestampConverter) Convert(value any) (any, error) { timeValue, ok := value.(time.Time) if !ok { return nil, fmt.Errorf("expected time.Time got %T with value: %v", value, value) diff --git a/lib/debezium/converters/time_test.go b/lib/debezium/converters/time_test.go index a5a142eb..32608822 100644 --- a/lib/debezium/converters/time_test.go +++ b/lib/debezium/converters/time_test.go @@ -121,8 +121,35 @@ func TestDateConverter_Convert(t *testing.T) { } } -func TestTimestampConverter_Convert(t *testing.T) { - converter := TimestampConverter{} +func TestMicroTimestampConverter_Convert(t *testing.T) { + converter := MicroTimestampConverter{} + { + // Invalid type + _, err := converter.Convert(1234) + assert.ErrorContains(t, err, "expected time.Time got int with value: 1234") + } + { + // Date > 9999 + value, err := converter.Convert(time.Date(9_9999, 2, 3, 4, 5, 0, 0, time.UTC)) + assert.NoError(t, err) + assert.Equal(t, value, int64(3093499310700000000)) + } + { + // Date < 0 + value, err := converter.Convert(time.Date(-1, 2, 3, 4, 5, 0, 0, time.UTC)) + assert.NoError(t, err) + assert.Equal(t, int64(-62195889300000000), value) + } + { + // time.Time + value, err := converter.Convert(time.Date(2001, 2, 3, 4, 5, 0, 0, time.UTC)) + assert.NoError(t, err) + assert.Equal(t, int64(981173100000000), value) + } +} + +func TestZonedTimestampConverter_Convert(t *testing.T) { + converter := ZonedTimestampConverter{} { // Invalid type _, err := converter.Convert(1234) diff --git a/lib/postgres/scanner.go b/lib/postgres/scanner.go index 9a0c1c85..dfdc30d3 100644 --- a/lib/postgres/scanner.go +++ b/lib/postgres/scanner.go @@ -33,6 +33,7 @@ var supportedPrimaryKeyDataType []schema.DataType = []schema.DataType{ schema.Time, schema.Date, schema.Timestamp, + schema.TimestampWithTimeZone, schema.Interval, schema.UUID, schema.JSON, @@ -127,6 +128,7 @@ func (s scanAdapter) ParsePrimaryKeyValue(columnName string, value string) (any, schema.Time, schema.Date, schema.Timestamp, + schema.TimestampWithTimeZone, schema.Interval, schema.UUID, schema.JSON: diff --git a/lib/postgres/schema/schema.go b/lib/postgres/schema/schema.go index 66c4a0b6..a33d5691 100644 --- a/lib/postgres/schema/schema.go +++ b/lib/postgres/schema/schema.go @@ -32,6 +32,7 @@ const ( TimeWithTimeZone Date Timestamp + TimestampWithTimeZone Interval UUID Array @@ -119,8 +120,10 @@ func ParseColumnDataType(colKind string, precision, scale *int, udtName *string) return TimeWithTimeZone, nil, nil case "date": return Date, nil, nil - case "timestamp without time zone", "timestamp with time zone": + case "timestamp without time zone": return Timestamp, nil, nil + case "timestamp with time zone": + return TimestampWithTimeZone, nil, nil case "interval": return Interval, nil, nil case "uuid": diff --git a/sources/mysql/adapter/adapter.go b/sources/mysql/adapter/adapter.go index 124da1ac..5ddf9d78 100644 --- a/sources/mysql/adapter/adapter.go +++ b/sources/mysql/adapter/adapter.go @@ -110,8 +110,10 @@ func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.Val return converters.MicroTimeConverter{}, nil case schema.Date: return converters.DateConverter{}, nil - case schema.DateTime, schema.Timestamp: - return converters.TimestampConverter{}, nil + case schema.DateTime: + return converters.ZonedTimestampConverter{}, nil + case schema.Timestamp: + return converters.MicroTimestampConverter{}, nil case schema.Year: return converters.YearConverter{}, nil case schema.Enum: diff --git a/sources/postgres/adapter/adapter.go b/sources/postgres/adapter/adapter.go index ec32d5df..77cadcc9 100644 --- a/sources/postgres/adapter/adapter.go +++ b/sources/postgres/adapter/adapter.go @@ -106,7 +106,9 @@ func valueConverterForType(dataType schema.DataType, opts *schema.Opts) (convert case schema.Date: return converters.DateConverter{}, nil case schema.Timestamp: - return converters.TimestampConverter{}, nil + return converters.MicroTimestampConverter{}, nil + case schema.TimestampWithTimeZone: + return converters.ZonedTimestampConverter{}, nil case schema.Interval: return PgIntervalConverter{}, nil case schema.UUID: