From f9bd23b0bc0944ad843b656ac01178b6e6cb2b84 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 4 Sep 2024 11:10:01 -0700 Subject: [PATCH 1/3] [Debezium] Adding LTree support. (#877) --- lib/debezium/schema.go | 2 +- lib/debezium/schema_test.go | 4 ++++ lib/debezium/types.go | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 880553b38..9904956d0 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -83,7 +83,7 @@ func (f Field) GetScaleAndPrecision() (int32, *int32, error) { func (f Field) ToValueConverter() converters.ValueConverter { switch f.DebeziumType { - case UUID, Enum: + case UUID, LTree, Enum: return converters.StringPassthrough{} case DateTimeWithTimezone: return converters.DateTimeWithTimezone{} diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index 70dfb3e9c..f9081827a 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -124,6 +124,10 @@ func TestField_ToKindDetails(t *testing.T) { // Enum assert.Equal(t, typing.String, Field{DebeziumType: Enum, Type: String}.ToKindDetails()) } + { + // LTree + assert.Equal(t, typing.String, Field{DebeziumType: LTree, Type: String}.ToKindDetails()) + } } { // Structs diff --git a/lib/debezium/types.go b/lib/debezium/types.go index 76362434b..1010a6381 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -32,6 +32,7 @@ const ( Enum SupportedDebeziumType = "io.debezium.data.Enum" EnumSet SupportedDebeziumType = "io.debezium.data.EnumSet" UUID SupportedDebeziumType = "io.debezium.data.Uuid" + LTree SupportedDebeziumType = "io.debezium.data.Ltree" // Dates Date SupportedDebeziumType = "io.debezium.time.Date" From 59041560ecba5456009625a98f89da654f74f098 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 4 Sep 2024 11:14:48 -0700 Subject: [PATCH 2/3] [Debezium] Adding Year converter (#875) --- lib/debezium/converters/basic.go | 14 ++++++++++++++ lib/debezium/converters/basic_test.go | 14 ++++++++++++++ lib/debezium/schema.go | 2 ++ lib/debezium/schema_test.go | 5 +++++ lib/debezium/types_test.go | 15 +++++++++++++++ 5 files changed, 50 insertions(+) diff --git a/lib/debezium/converters/basic.go b/lib/debezium/converters/basic.go index 4f23ac803..b6b92392e 100644 --- a/lib/debezium/converters/basic.go +++ b/lib/debezium/converters/basic.go @@ -26,3 +26,17 @@ func (JSON) Convert(value any) (any, error) { func (JSON) ToKindDetails() typing.KindDetails { return typing.Struct } + +type Year struct{} + +func (Year) ToKindDetails() typing.KindDetails { + return typing.Integer +} + +func (Year) Convert(value any) (any, error) { + if _, err := typing.AssertType[int64](value); err != nil { + return nil, err + } + + return value, nil +} diff --git a/lib/debezium/converters/basic_test.go b/lib/debezium/converters/basic_test.go index 31952da41..d6c6434d8 100644 --- a/lib/debezium/converters/basic_test.go +++ b/lib/debezium/converters/basic_test.go @@ -20,3 +20,17 @@ func TestJSON_Convert(t *testing.T) { assert.Equal(t, `{"a":2}`, value) } } + +func TestYear_Convert(t *testing.T) { + { + // Wrong data type + _, err := Year{}.Convert("123") + assert.ErrorContains(t, err, "expected type int64, got string") + } + { + // Valid data type + value, err := Year{}.Convert(int64(2024)) + assert.NoError(t, err) + assert.Equal(t, int64(2024), value) + } +} diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 9904956d0..d8abbe7fe 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -97,6 +97,8 @@ func (f Field) ToValueConverter() converters.ValueConverter { return converters.JSON{} case Date, DateKafkaConnect: return converters.Date{} + case Year: + return &converters.Year{} // Time case Time, TimeKafkaConnect: return converters.Time{} diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index f9081827a..5f15f1e6e 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -168,4 +168,9 @@ func TestField_ToKindDetails(t *testing.T) { assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType), Field{DebeziumType: dbzType}.ToKindDetails()) } } + { + // Basic + assert.Equal(t, typing.Integer, Field{DebeziumType: Year}.ToKindDetails()) + assert.Equal(t, typing.Struct, Field{DebeziumType: JSON}.ToKindDetails()) + } } diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index a45994884..08211d850 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -139,6 +139,21 @@ func TestField_ParseValue(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "dusty", value) } + { + // Year + { + // Floats (from JSON marshal), preprocessing should convert it to int64. + value, err := Field{Type: Int32, DebeziumType: Year}.ParseValue(2024.0) + assert.NoError(t, err) + assert.Equal(t, int64(2024), value) + } + { + // Int32 + value, err := Field{Type: Int32, DebeziumType: Year}.ParseValue(int32(2024)) + assert.NoError(t, err) + assert.Equal(t, int64(2024), value) + } + } { // JSON field := Field{Type: String, DebeziumType: JSON} From 0d0c9b96a0ff1124659af737b36bf637a07bb50a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 4 Sep 2024 11:27:40 -0700 Subject: [PATCH 3/3] [Debezium] Int64 Passthrough (#878) --- lib/debezium/converters/basic.go | 6 +++--- lib/debezium/converters/basic_test.go | 6 +++--- lib/debezium/schema.go | 6 ++++-- lib/debezium/schema_test.go | 7 ++++++- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/debezium/converters/basic.go b/lib/debezium/converters/basic.go index b6b92392e..07ffe47e5 100644 --- a/lib/debezium/converters/basic.go +++ b/lib/debezium/converters/basic.go @@ -27,13 +27,13 @@ func (JSON) ToKindDetails() typing.KindDetails { return typing.Struct } -type Year struct{} +type Int64Passthrough struct{} -func (Year) ToKindDetails() typing.KindDetails { +func (Int64Passthrough) ToKindDetails() typing.KindDetails { return typing.Integer } -func (Year) Convert(value any) (any, error) { +func (Int64Passthrough) Convert(value any) (any, error) { if _, err := typing.AssertType[int64](value); err != nil { return nil, err } diff --git a/lib/debezium/converters/basic_test.go b/lib/debezium/converters/basic_test.go index d6c6434d8..dfe039e2c 100644 --- a/lib/debezium/converters/basic_test.go +++ b/lib/debezium/converters/basic_test.go @@ -21,15 +21,15 @@ func TestJSON_Convert(t *testing.T) { } } -func TestYear_Convert(t *testing.T) { +func TestInt64Passthrough_Convert(t *testing.T) { { // Wrong data type - _, err := Year{}.Convert("123") + _, err := Int64Passthrough{}.Convert("123") assert.ErrorContains(t, err, "expected type int64, got string") } { // Valid data type - value, err := Year{}.Convert(int64(2024)) + value, err := Int64Passthrough{}.Convert(int64(2024)) assert.NoError(t, err) assert.Equal(t, int64(2024), value) } diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index d8abbe7fe..a2aeedccd 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -83,8 +83,11 @@ func (f Field) GetScaleAndPrecision() (int32, *int32, error) { func (f Field) ToValueConverter() converters.ValueConverter { switch f.DebeziumType { + // Passthrough converters case UUID, LTree, Enum: return converters.StringPassthrough{} + case Year, MicroDuration: + return &converters.Int64Passthrough{} case DateTimeWithTimezone: return converters.DateTimeWithTimezone{} case TimeWithTimezone: @@ -97,8 +100,7 @@ func (f Field) ToValueConverter() converters.ValueConverter { return converters.JSON{} case Date, DateKafkaConnect: return converters.Date{} - case Year: - return &converters.Year{} + // Time case Time, TimeKafkaConnect: return converters.Time{} diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index 5f15f1e6e..5c1907cad 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -170,7 +170,12 @@ func TestField_ToKindDetails(t *testing.T) { } { // Basic - assert.Equal(t, typing.Integer, Field{DebeziumType: Year}.ToKindDetails()) + { + // Int64 Passthrough + assert.Equal(t, typing.Integer, Field{DebeziumType: Year}.ToKindDetails()) + assert.Equal(t, typing.Integer, Field{DebeziumType: MicroDuration}.ToKindDetails()) + } + assert.Equal(t, typing.Struct, Field{DebeziumType: JSON}.ToKindDetails()) } }