From 9ec6c978d53bb7cf8433b4f3732a612e6ca6b659 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 29 Jul 2024 15:44:13 -0700 Subject: [PATCH] [Debezium] Add Date and Time converters (#821) --- lib/debezium/converters/date.go | 25 +++++++++++++++++++++ lib/debezium/converters/date_test.go | 33 ++++++++++++++++++++++++++++ lib/debezium/converters/time.go | 16 ++++++++++++++ lib/debezium/converters/time_test.go | 11 ++++++++++ lib/debezium/schema.go | 8 ++++--- lib/debezium/types.go | 12 ---------- lib/debezium/types_test.go | 16 -------------- 7 files changed, 90 insertions(+), 31 deletions(-) create mode 100644 lib/debezium/converters/date.go create mode 100644 lib/debezium/converters/date_test.go diff --git a/lib/debezium/converters/date.go b/lib/debezium/converters/date.go new file mode 100644 index 000000000..5a0c6aded --- /dev/null +++ b/lib/debezium/converters/date.go @@ -0,0 +1,25 @@ +package converters + +import ( + "fmt" + "time" + + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/ext" +) + +type Date struct{} + +func (Date) ToKindDetails() typing.KindDetails { + return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType) +} + +func (Date) Convert(value any) (any, error) { + valueInt64, isOk := value.(int64) + if !isOk { + return nil, fmt.Errorf("expected int64 got '%v' with type %T", value, value) + } + + // Represents the number of days since the epoch. + return ext.NewExtendedTime(time.UnixMilli(0).In(time.UTC).AddDate(0, 0, int(valueInt64)), ext.DateKindType, ""), nil +} diff --git a/lib/debezium/converters/date_test.go b/lib/debezium/converters/date_test.go new file mode 100644 index 000000000..870124d24 --- /dev/null +++ b/lib/debezium/converters/date_test.go @@ -0,0 +1,33 @@ +package converters + +import ( + "testing" + + "github.com/artie-labs/transfer/lib/typing/ext" + + "github.com/stretchr/testify/assert" +) + +func TestDate_Convert(t *testing.T) { + { + // Invalid data type + _, err := Date{}.Convert("invalid") + assert.ErrorContains(t, err, "expected int64 got 'invalid' with type string") + } + { + val, err := Date{}.Convert(int64(19401)) + assert.NoError(t, err) + + extTime, isOk := val.(*ext.ExtendedTime) + assert.True(t, isOk) + assert.Equal(t, "2023-02-13", extTime.String("")) + } + { + val, err := Date{}.Convert(int64(19429)) + assert.NoError(t, err) + + extTime, isOk := val.(*ext.ExtendedTime) + assert.True(t, isOk) + assert.Equal(t, "2023-03-13", extTime.String("")) + } +} diff --git a/lib/debezium/converters/time.go b/lib/debezium/converters/time.go index 4e69a6669..412dedb6b 100644 --- a/lib/debezium/converters/time.go +++ b/lib/debezium/converters/time.go @@ -9,6 +9,22 @@ import ( "github.com/artie-labs/transfer/lib/typing/ext" ) +type Time struct{} + +func (Time) ToKindDetails() typing.KindDetails { + return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType) +} + +func (Time) Convert(val any) (any, error) { + valInt64, isOk := val.(int64) + if !isOk { + return nil, fmt.Errorf("expected int64 got '%v' with type %T", val, val) + } + + // Represents the number of milliseconds past midnight, and does not include timezone information. + return ext.NewExtendedTime(time.UnixMilli(valInt64).In(time.UTC), ext.TimeKindType, ""), nil +} + type DateTimeWithTimezone struct{} func (DateTimeWithTimezone) ToKindDetails() typing.KindDetails { diff --git a/lib/debezium/converters/time_test.go b/lib/debezium/converters/time_test.go index 1424f2d42..c73aec38f 100644 --- a/lib/debezium/converters/time_test.go +++ b/lib/debezium/converters/time_test.go @@ -47,6 +47,17 @@ func TestConvertDateTimeWithTimezone(t *testing.T) { } } +func TestTime_Convert(t *testing.T) { + { + val, err := Time{}.Convert(int64(54720000)) + assert.NoError(t, err) + + extTime, isOk := val.(*ext.ExtendedTime) + assert.True(t, isOk) + assert.Equal(t, "15:12:00+00", extTime.String("")) + } +} + func TestConvertTimeWithTimezone(t *testing.T) { { // Invalid diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index f385800ab..2453f6066 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -92,6 +92,10 @@ func (f Field) ToValueConverter() converters.ValueConverter { return converters.Geometry{} case JSON: return converters.JSON{} + case Date, DateKafkaConnect: + return converters.Date{} + case Time, TimeKafkaConnect: + return converters.Time{} } return nil @@ -110,9 +114,7 @@ func (f Field) ToKindDetails() typing.KindDetails { switch f.DebeziumType { case Timestamp, MicroTimestamp, NanoTimestamp, DateTimeKafkaConnect: return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType) - case Date, DateKafkaConnect: - return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType) - case Time, MicroTime, NanoTime, TimeKafkaConnect: + case MicroTime, NanoTime: return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType) case KafkaDecimalType: scale, precisionPtr, err := f.GetScaleAndPrecision() diff --git a/lib/debezium/types.go b/lib/debezium/types.go index b712950a3..8ce2a17b6 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -131,14 +131,9 @@ func (f Field) ParseValue(value any) (any, error) { Timestamp, MicroTimestamp, NanoTimestamp, - Date, - Time, NanoTime, MicroTime, - DateKafkaConnect, - TimeKafkaConnect, DateTimeKafkaConnect: - int64Value, ok := value.(int64) if !ok { return nil, fmt.Errorf("expected int64 got '%v' with type %T", value, value) @@ -170,13 +165,6 @@ func FromDebeziumTypeToTime(supportedType SupportedDebeziumType, val int64) (*ex case NanoTimestamp: // Represents the number of nanoseconds past the epoch, and does not include timezone information. extTime = ext.NewExtendedTime(time.UnixMicro(val/1_000).In(time.UTC), ext.DateTimeKindType, time.RFC3339Nano) - case Date, DateKafkaConnect: - unix := time.UnixMilli(0).In(time.UTC) // 1970-01-01 - // Represents the number of days since the epoch. - extTime = ext.NewExtendedTime(unix.AddDate(0, 0, int(val)), ext.DateKindType, "") - case Time, TimeKafkaConnect: - // Represents the number of milliseconds past midnight, and does not include timezone information. - extTime = ext.NewExtendedTime(time.UnixMilli(val).In(time.UTC), ext.TimeKindType, "") case MicroTime: // Represents the number of microseconds past midnight, and does not include timezone information. extTime = ext.NewExtendedTime(time.UnixMicro(val).In(time.UTC), ext.TimeKindType, "") diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index 2410bfa37..e0084ce7f 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -383,27 +383,11 @@ func TestField_ParseValue(t *testing.T) { } } -func TestFromDebeziumTypeToTime(t *testing.T) { - dt, err := FromDebeziumTypeToTime(Date, int64(19401)) - assert.Equal(t, "2023-02-13", dt.String("")) - assert.NoError(t, err) -} - func TestFromDebeziumTypeTimePrecisionConnect(t *testing.T) { // Timestamp extendedTimestamp, err := FromDebeziumTypeToTime(DateTimeKafkaConnect, 1678901050700) assert.NoError(t, err) assert.Equal(t, time.Date(2023, 03, 15, 17, 24, 10, 700000000, time.UTC), extendedTimestamp.Time) - - // Time - extendedTime, timeErr := FromDebeziumTypeToTime(TimeKafkaConnect, 54720000) - assert.NoError(t, timeErr) - assert.Equal(t, "15:12:00+00", extendedTime.String("")) - - // Date - extendedDate, dateErr := FromDebeziumTypeToTime(DateKafkaConnect, 19429) - assert.NoError(t, dateErr) - assert.Equal(t, "2023-03-13", extendedDate.String("")) } func TestField_DecodeDecimal(t *testing.T) {