Skip to content

Commit

Permalink
Merge branch 'master' into default-value-types
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 3, 2024
2 parents 7d6b09a + 6f8db5a commit 966f280
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 144 deletions.
21 changes: 9 additions & 12 deletions lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,18 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
assert.Equal(r.T(), evtData["id"], int64(1001))
assert.Equal(r.T(), evtData["another_id"], int64(333))
assert.Equal(r.T(), typing.ParseValue(typing.Settings{}, "another_id", evt.GetOptionalSchema(), evtData["another_id"]), typing.Integer)

assert.Equal(r.T(), evtData["email"], "[email protected]")

// Datetime without TZ is emitted in microseconds which is 1000x larger than nanoseconds.
td := time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC)
assert.Equal(r.T(), evtData["ts_no_tz1"], &ext.ExtendedTime{
Time: td,
NestedKind: ext.NestedKind{
Type: ext.DateTimeKindType,
Format: time.RFC3339Nano,
},
})

assert.Equal(r.T(), time.Date(2023, time.February, 2,
17, 54, 11, 451000000, time.UTC), evt.GetExecutionTime())
assert.Equal(
r.T(),
ext.NewExtendedTime(
time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC),
ext.DateTimeKindType, ext.RFC3339Microsecond,
),
evtData["ts_no_tz1"],
)
assert.Equal(r.T(), time.Date(2023, time.February, 2, 17, 54, 11, 451000000, time.UTC), evt.GetExecutionTime())
assert.Equal(r.T(), "customers", evt.GetTableName())
}

Expand Down
56 changes: 56 additions & 0 deletions lib/debezium/converters/timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package converters

import (
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)

type Timestamp struct{}

func (Timestamp) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)
}

func (Timestamp) Convert(value any) (any, error) {
castedValue, err := typing.AssertType[int64](value)
if err != nil {
return nil, err
}

// Represents the number of milliseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMilli(castedValue).In(time.UTC), ext.DateTimeKindType, ext.RFC3339Millisecond), nil
}

type MicroTimestamp struct{}

func (MicroTimestamp) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)
}

func (MicroTimestamp) Convert(value any) (any, error) {
castedValue, err := typing.AssertType[int64](value)
if err != nil {
return nil, err
}

// Represents the number of microseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMicro(castedValue).In(time.UTC), ext.DateTimeKindType, ext.RFC3339Microsecond), nil
}

type NanoTimestamp struct{}

func (NanoTimestamp) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)
}

func (NanoTimestamp) Convert(value any) (any, error) {
castedValue, err := typing.AssertType[int64](value)
if err != nil {
return nil, err
}

// Represents the number of nanoseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMicro(castedValue/1_000).In(time.UTC), ext.DateTimeKindType, ext.RFC3339Nanosecond), nil
}
72 changes: 72 additions & 0 deletions lib/debezium/converters/timestamp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package converters

import (
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)

func TestTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType), Timestamp{}.ToKindDetails())
{
// Invalid conversion
_, err := Timestamp{}.Convert("invalid")
assert.ErrorContains(t, err, "expected type int64, got string")
}
{
// Valid conversion
converted, err := Timestamp{}.Convert(int64(1_725_058_799_089))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.089Z", converted.(*ext.ExtendedTime).String(""))
}
{
// ms is preserved despite it being all zeroes.
converted, err := Timestamp{}.Convert(int64(1_725_058_799_000))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.000Z", converted.(*ext.ExtendedTime).String(""))
}
}

func TestMicroTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType), MicroTimestamp{}.ToKindDetails())
{
// Invalid conversion
_, err := MicroTimestamp{}.Convert("invalid")
assert.ErrorContains(t, err, "expected type int64, got string")
}
{
// Valid conversion
converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_827_923))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827923Z", converted.(*ext.ExtendedTime).String(""))
}
{
// micros is preserved despite it being all zeroes.
converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_820_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.820000Z", converted.(*ext.ExtendedTime).String(""))
}
}

func TestNanoTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType), NanoTimestamp{}.ToKindDetails())
{
// Invalid conversion
_, err := NanoTimestamp{}.Convert("invalid")
assert.ErrorContains(t, err, "expected type int64, got string")
}
{
// Valid conversion
converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_001_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827001000Z", converted.(*ext.ExtendedTime).String(""))
}
{
// nanos is preserved despite it being all zeroes.
converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_000_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827000000Z", converted.(*ext.ExtendedTime).String(""))
}
}
11 changes: 8 additions & 3 deletions lib/debezium/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
)

type Schema struct {
Expand Down Expand Up @@ -98,12 +97,20 @@ func (f Field) ToValueConverter() converters.ValueConverter {
return converters.JSON{}
case Date, DateKafkaConnect:
return converters.Date{}
// Time
case Time, TimeKafkaConnect:
return converters.Time{}
case NanoTime:
return converters.NanoTime{}
case MicroTime:
return converters.MicroTime{}
// Timestamp
case Timestamp, TimestampKafkaConnect:
return converters.Timestamp{}
case MicroTimestamp:
return converters.MicroTimestamp{}
case NanoTimestamp:
return converters.NanoTimestamp{}
}

return nil
Expand All @@ -120,8 +127,6 @@ func (f Field) ToKindDetails() typing.KindDetails {
// We'll first cast based on Debezium types
// Then, we'll fall back on the actual data types.
switch f.DebeziumType {
case Timestamp, MicroTimestamp, NanoTimestamp, DateTimeKafkaConnect:
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)
case KafkaDecimalType:
scale, precisionPtr, err := f.GetScaleAndPrecision()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestField_ToKindDetails(t *testing.T) {
{
// Timestamp
// Datetime (for now)
for _, dbzType := range []SupportedDebeziumType{Timestamp, DateTimeKafkaConnect, MicroTimestamp, NanoTimestamp, DateTimeWithTimezone} {
for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp, DateTimeWithTimezone} {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType), Field{DebeziumType: dbzType}.ToKindDetails())
}
}
Expand Down
55 changes: 10 additions & 45 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"

Expand Down Expand Up @@ -33,23 +32,26 @@ const (
EnumSet SupportedDebeziumType = "io.debezium.data.EnumSet"
UUID SupportedDebeziumType = "io.debezium.data.Uuid"

Timestamp SupportedDebeziumType = "io.debezium.time.Timestamp"
MicroTimestamp SupportedDebeziumType = "io.debezium.time.MicroTimestamp"
NanoTimestamp SupportedDebeziumType = "io.debezium.time.NanoTimestamp"
// Dates
Date SupportedDebeziumType = "io.debezium.time.Date"
Year SupportedDebeziumType = "io.debezium.time.Year"
DateKafkaConnect SupportedDebeziumType = "org.apache.kafka.connect.data.Date"
DateTimeWithTimezone SupportedDebeziumType = "io.debezium.time.ZonedTimestamp"
MicroDuration SupportedDebeziumType = "io.debezium.time.MicroDuration"
DateKafkaConnect SupportedDebeziumType = "org.apache.kafka.connect.data.Date"
DateTimeKafkaConnect SupportedDebeziumType = "org.apache.kafka.connect.data.Timestamp"
Year SupportedDebeziumType = "io.debezium.time.Year"

// All the possible time data types
// Time
Time SupportedDebeziumType = "io.debezium.time.Time"
MicroTime SupportedDebeziumType = "io.debezium.time.MicroTime"
NanoTime SupportedDebeziumType = "io.debezium.time.NanoTime"
TimeWithTimezone SupportedDebeziumType = "io.debezium.time.ZonedTime"
TimeKafkaConnect SupportedDebeziumType = "org.apache.kafka.connect.data.Time"

// Timestamps
MicroTimestamp SupportedDebeziumType = "io.debezium.time.MicroTimestamp"
NanoTimestamp SupportedDebeziumType = "io.debezium.time.NanoTimestamp"
Timestamp SupportedDebeziumType = "io.debezium.time.Timestamp"
TimestampKafkaConnect SupportedDebeziumType = "org.apache.kafka.connect.data.Timestamp"

KafkaDecimalType SupportedDebeziumType = "org.apache.kafka.connect.data.Decimal"
KafkaVariableNumericType SupportedDebeziumType = "io.debezium.data.VariableScaleDecimal"

Expand Down Expand Up @@ -156,18 +158,6 @@ func (f Field) ParseValue(value any) (any, error) {
return f.DecodeDecimal(bytes)
case KafkaVariableNumericType:
return f.DecodeDebeziumVariableDecimal(value)
case
Timestamp,
MicroTimestamp,
NanoTimestamp,
NanoTime,
MicroTime,
DateTimeKafkaConnect:
int64Value, ok := value.(int64)
if !ok {
return nil, fmt.Errorf("expected int64 got '%v' with type %T", value, value)
}
return FromDebeziumTypeToTime(f.DebeziumType, int64Value)
}

if bytes, ok := value.([]byte); ok {
Expand All @@ -180,31 +170,6 @@ func (f Field) ParseValue(value any) (any, error) {
return value, nil
}

// FromDebeziumTypeToTime is implemented by following this spec: https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types
func FromDebeziumTypeToTime(supportedType SupportedDebeziumType, val int64) (*ext.ExtendedTime, error) {
var extTime *ext.ExtendedTime

switch supportedType {
case Timestamp, DateTimeKafkaConnect:
// Represents the number of milliseconds since the epoch, and does not include timezone information.
extTime = ext.NewExtendedTime(time.UnixMilli(val).In(time.UTC), ext.DateTimeKindType, time.RFC3339Nano)
case MicroTimestamp:
// Represents the number of microseconds since the epoch, and does not include timezone information.
extTime = ext.NewExtendedTime(time.UnixMicro(val).In(time.UTC), ext.DateTimeKindType, time.RFC3339Nano)
case NanoTimestamp:
// Represents the number of nanoseconds past the epoch, and does not include timezone information.
extTime = ext.NewExtendedTime(time.UnixMicro(val/1_000).In(time.UTC), ext.DateTimeKindType, time.RFC3339Nano)
default:
return nil, fmt.Errorf("supportedType: %s, val: %v failed to be matched", supportedType, val)
}

if extTime != nil && !extTime.IsValid() {
return nil, fmt.Errorf("extTime is invalid: %v", extTime)
}

return extTime, nil
}

// DecodeDecimal is used to handle `org.apache.kafka.connect.data.Decimal` where this would be emitted by Debezium when the `decimal.handling.mode` is `precise`
// * Encoded - takes the encoded value as a slice of bytes
// * Parameters - which contains:
Expand Down
Loading

0 comments on commit 966f280

Please sign in to comment.