From 0db1d0755dbbd6217561dbbf47f4f83e21d0b70f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 28 Jul 2024 22:41:40 -0700 Subject: [PATCH] Do Time too. --- lib/debezium/converters/date.go | 3 +-- lib/debezium/converters/time.go | 16 ++++++++++++++++ lib/debezium/schema.go | 4 +++- lib/debezium/types.go | 3 --- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/lib/debezium/converters/date.go b/lib/debezium/converters/date.go index 046b64541..5a0c6aded 100644 --- a/lib/debezium/converters/date.go +++ b/lib/debezium/converters/date.go @@ -20,7 +20,6 @@ func (Date) Convert(value any) (any, error) { return nil, fmt.Errorf("expected int64 got '%v' with type %T", value, value) } - unix := time.UnixMilli(0).In(time.UTC) // 1970-01-01 // Represents the number of days since the epoch. - return ext.NewExtendedTime(unix.AddDate(0, 0, int(valueInt64)), ext.DateKindType, ""), nil + return ext.NewExtendedTime(time.UnixMilli(0).In(time.UTC).AddDate(0, 0, int(valueInt64)), ext.DateKindType, ""), nil } diff --git a/lib/debezium/converters/time.go b/lib/debezium/converters/time.go index 4e69a6669..9504b9021 100644 --- a/lib/debezium/converters/time.go +++ b/lib/debezium/converters/time.go @@ -9,6 +9,22 @@ import ( "github.com/artie-labs/transfer/lib/typing/ext" ) +type Time struct{} + +func (Time) ToKindDetails() typing.KindDetails { + return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType) +} + +func (Time) Convert(val any) (any, error) { + valInt64, isOk := val.(int64) + if isOk { + return nil, fmt.Errorf("expected int64 got '%v' with type %T", val, val) + } + + // Represents the number of milliseconds past midnight, and does not include timezone information. + return ext.NewExtendedTime(time.UnixMilli(valInt64).In(time.UTC), ext.TimeKindType, ""), nil +} + type DateTimeWithTimezone struct{} func (DateTimeWithTimezone) ToKindDetails() typing.KindDetails { diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index a502ba35a..2453f6066 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -94,6 +94,8 @@ func (f Field) ToValueConverter() converters.ValueConverter { return converters.JSON{} case Date, DateKafkaConnect: return converters.Date{} + case Time, TimeKafkaConnect: + return converters.Time{} } return nil @@ -112,7 +114,7 @@ func (f Field) ToKindDetails() typing.KindDetails { switch f.DebeziumType { case Timestamp, MicroTimestamp, NanoTimestamp, DateTimeKafkaConnect: return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType) - case Time, MicroTime, NanoTime, TimeKafkaConnect: + case MicroTime, NanoTime: return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType) case KafkaDecimalType: scale, precisionPtr, err := f.GetScaleAndPrecision() diff --git a/lib/debezium/types.go b/lib/debezium/types.go index 19912bffe..8eb6b241a 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -170,9 +170,6 @@ func FromDebeziumTypeToTime(supportedType SupportedDebeziumType, val int64) (*ex case NanoTimestamp: // Represents the number of nanoseconds past the epoch, and does not include timezone information. extTime = ext.NewExtendedTime(time.UnixMicro(val/1_000).In(time.UTC), ext.DateTimeKindType, time.RFC3339Nano) - case Time, TimeKafkaConnect: - // Represents the number of milliseconds past midnight, and does not include timezone information. - extTime = ext.NewExtendedTime(time.UnixMilli(val).In(time.UTC), ext.TimeKindType, "") case MicroTime: // Represents the number of microseconds past midnight, and does not include timezone information. extTime = ext.NewExtendedTime(time.UnixMicro(val).In(time.UTC), ext.TimeKindType, "")