Skip to content

Commit

Permalink
[Debezium] Add Date and Time converters (#821)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 29, 2024
1 parent e202e16 commit 9ec6c97
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 31 deletions.
25 changes: 25 additions & 0 deletions lib/debezium/converters/date.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package converters

import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)

type Date struct{}

func (Date) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)
}

func (Date) Convert(value any) (any, error) {
valueInt64, isOk := value.(int64)
if !isOk {
return nil, fmt.Errorf("expected int64 got '%v' with type %T", value, value)
}

// Represents the number of days since the epoch.
return ext.NewExtendedTime(time.UnixMilli(0).In(time.UTC).AddDate(0, 0, int(valueInt64)), ext.DateKindType, ""), nil
}
33 changes: 33 additions & 0 deletions lib/debezium/converters/date_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package converters

import (
"testing"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"
)

func TestDate_Convert(t *testing.T) {
{
// Invalid data type
_, err := Date{}.Convert("invalid")
assert.ErrorContains(t, err, "expected int64 got 'invalid' with type string")
}
{
val, err := Date{}.Convert(int64(19401))
assert.NoError(t, err)

extTime, isOk := val.(*ext.ExtendedTime)
assert.True(t, isOk)
assert.Equal(t, "2023-02-13", extTime.String(""))
}
{
val, err := Date{}.Convert(int64(19429))
assert.NoError(t, err)

extTime, isOk := val.(*ext.ExtendedTime)
assert.True(t, isOk)
assert.Equal(t, "2023-03-13", extTime.String(""))
}
}
16 changes: 16 additions & 0 deletions lib/debezium/converters/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ import (
"github.com/artie-labs/transfer/lib/typing/ext"
)

type Time struct{}

func (Time) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)
}

func (Time) Convert(val any) (any, error) {
valInt64, isOk := val.(int64)
if !isOk {
return nil, fmt.Errorf("expected int64 got '%v' with type %T", val, val)
}

// Represents the number of milliseconds past midnight, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMilli(valInt64).In(time.UTC), ext.TimeKindType, ""), nil
}

type DateTimeWithTimezone struct{}

func (DateTimeWithTimezone) ToKindDetails() typing.KindDetails {
Expand Down
11 changes: 11 additions & 0 deletions lib/debezium/converters/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func TestConvertDateTimeWithTimezone(t *testing.T) {
}
}

func TestTime_Convert(t *testing.T) {
{
val, err := Time{}.Convert(int64(54720000))
assert.NoError(t, err)

extTime, isOk := val.(*ext.ExtendedTime)
assert.True(t, isOk)
assert.Equal(t, "15:12:00+00", extTime.String(""))
}
}

func TestConvertTimeWithTimezone(t *testing.T) {
{
// Invalid
Expand Down
8 changes: 5 additions & 3 deletions lib/debezium/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (f Field) ToValueConverter() converters.ValueConverter {
return converters.Geometry{}
case JSON:
return converters.JSON{}
case Date, DateKafkaConnect:
return converters.Date{}
case Time, TimeKafkaConnect:
return converters.Time{}
}

return nil
Expand All @@ -110,9 +114,7 @@ func (f Field) ToKindDetails() typing.KindDetails {
switch f.DebeziumType {
case Timestamp, MicroTimestamp, NanoTimestamp, DateTimeKafkaConnect:
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)
case Date, DateKafkaConnect:
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)
case Time, MicroTime, NanoTime, TimeKafkaConnect:
case MicroTime, NanoTime:
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)
case KafkaDecimalType:
scale, precisionPtr, err := f.GetScaleAndPrecision()
Expand Down
12 changes: 0 additions & 12 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,9 @@ func (f Field) ParseValue(value any) (any, error) {
Timestamp,
MicroTimestamp,
NanoTimestamp,
Date,
Time,
NanoTime,
MicroTime,
DateKafkaConnect,
TimeKafkaConnect,
DateTimeKafkaConnect:

int64Value, ok := value.(int64)
if !ok {
return nil, fmt.Errorf("expected int64 got '%v' with type %T", value, value)
Expand Down Expand Up @@ -170,13 +165,6 @@ func FromDebeziumTypeToTime(supportedType SupportedDebeziumType, val int64) (*ex
case NanoTimestamp:
// Represents the number of nanoseconds past the epoch, and does not include timezone information.
extTime = ext.NewExtendedTime(time.UnixMicro(val/1_000).In(time.UTC), ext.DateTimeKindType, time.RFC3339Nano)
case Date, DateKafkaConnect:
unix := time.UnixMilli(0).In(time.UTC) // 1970-01-01
// Represents the number of days since the epoch.
extTime = ext.NewExtendedTime(unix.AddDate(0, 0, int(val)), ext.DateKindType, "")
case Time, TimeKafkaConnect:
// Represents the number of milliseconds past midnight, and does not include timezone information.
extTime = ext.NewExtendedTime(time.UnixMilli(val).In(time.UTC), ext.TimeKindType, "")
case MicroTime:
// Represents the number of microseconds past midnight, and does not include timezone information.
extTime = ext.NewExtendedTime(time.UnixMicro(val).In(time.UTC), ext.TimeKindType, "")
Expand Down
16 changes: 0 additions & 16 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,27 +383,11 @@ func TestField_ParseValue(t *testing.T) {
}
}

func TestFromDebeziumTypeToTime(t *testing.T) {
dt, err := FromDebeziumTypeToTime(Date, int64(19401))
assert.Equal(t, "2023-02-13", dt.String(""))
assert.NoError(t, err)
}

func TestFromDebeziumTypeTimePrecisionConnect(t *testing.T) {
// Timestamp
extendedTimestamp, err := FromDebeziumTypeToTime(DateTimeKafkaConnect, 1678901050700)
assert.NoError(t, err)
assert.Equal(t, time.Date(2023, 03, 15, 17, 24, 10, 700000000, time.UTC), extendedTimestamp.Time)

// Time
extendedTime, timeErr := FromDebeziumTypeToTime(TimeKafkaConnect, 54720000)
assert.NoError(t, timeErr)
assert.Equal(t, "15:12:00+00", extendedTime.String(""))

// Date
extendedDate, dateErr := FromDebeziumTypeToTime(DateKafkaConnect, 19429)
assert.NoError(t, dateErr)
assert.Equal(t, "2023-03-13", extendedDate.String(""))
}

func TestField_DecodeDecimal(t *testing.T) {
Expand Down

0 comments on commit 9ec6c97

Please sign in to comment.