From 9a450ff09eaa5605339bae6d8ad0a93e4975ec0f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 17 Oct 2024 11:05:34 -0700 Subject: [PATCH] WIP. --- clients/bigquery/storagewrite_test.go | 2 +- clients/snowflake/ddl_test.go | 16 +++++++++-- clients/snowflake/snowflake_test.go | 5 +++- lib/debezium/converters/time_test.go | 21 ++++++++++---- lib/debezium/converters/timestamp_test.go | 24 ++++++++++++++-- lib/debezium/schema_test.go | 34 +++++++++++++++++++---- lib/destination/ddl/ddl_sflk_test.go | 20 ++++++++++--- 7 files changed, 99 insertions(+), 23 deletions(-) diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go index ed9ce591a..f15ae75c0 100644 --- a/clients/bigquery/storagewrite_test.go +++ b/clients/bigquery/storagewrite_test.go @@ -72,7 +72,7 @@ func TestColumnToTableFieldSchema(t *testing.T) { { // ETime - Invalid: _, err := typing.NewTimeDetailsFromTemplate(typing.ETime, "", "") - assert.ErrorContains(t, err, "unsupported extended time details type:") + assert.ErrorContains(t, err, "unsupported extended time kind type:") } { // Struct: diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index 9847ed535..58834f8ef 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -23,13 +23,16 @@ import ( func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { tableID := NewTableIdentifier("coffee_shop", "public", "orders") + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(s.T(), err) + var cols columns.Columns for colName, kindDetails := range map[string]typing.KindDetails{ "id": typing.Integer, "customer_id": typing.Integer, "price": typing.Float, "name": typing.String, - "created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), + "created_at": _timestampTZ, } { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } @@ -52,13 +55,17 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { tableID := NewTableIdentifier("coffee_shop", "orders", "public") + + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(s.T(), err) + var cols columns.Columns for colName, kindDetails := range map[string]typing.KindDetails{ "id": typing.Integer, "customer_id": typing.Integer, "price": typing.Float, "name": typing.String, - "created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), + "created_at": _timestampTZ, } { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } @@ -91,13 +98,16 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { } func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() { + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(s.T(), err) + var cols columns.Columns for colName, kindDetails := range map[string]typing.KindDetails{ "id": typing.Integer, "customer_id": typing.Integer, "price": typing.Float, "name": typing.String, - "created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), + "created_at": _timestampTZ, } { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 838fdef2f..0727da407 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -238,9 +238,12 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { tableData.InsertRow(pk, row, false) } + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(s.T(), err) + snowflakeColToKindDetailsMap := map[string]typing.KindDetails{ "id": typing.Integer, - "created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), + "created_at": _timestampTZ, "name": typing.String, constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, diff --git a/lib/debezium/converters/time_test.go b/lib/debezium/converters/time_test.go index e45f2b585..d18cec4e6 100644 --- a/lib/debezium/converters/time_test.go +++ b/lib/debezium/converters/time_test.go @@ -4,11 +4,10 @@ import ( "testing" "time" - "github.com/artie-labs/transfer/lib/typing" + "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/stretchr/testify/assert" ) func TestZonedTimestamp_Convert(t *testing.T) { @@ -126,7 +125,13 @@ func TestTime_Convert(t *testing.T) { } func TestNanoTime_Converter(t *testing.T) { - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType), NanoTime{}.ToKindDetails()) + _time, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType, "") + assert.NoError(t, err) + + kd, err := NanoTime{}.ToKindDetails() + assert.NoError(t, err) + + assert.Equal(t, _time, kd) { // Invalid data _, err := NanoTime{}.Convert("123") @@ -141,7 +146,13 @@ func TestNanoTime_Converter(t *testing.T) { } func TestMicroTime_Converter(t *testing.T) { - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType), MicroTime{}.ToKindDetails()) + _time, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType, "") + assert.NoError(t, err) + + kd, err := MicroTime{}.ToKindDetails() + assert.NoError(t, err) + + assert.Equal(t, _time, kd) { // Invalid data _, err := MicroTime{}.Convert("123") diff --git a/lib/debezium/converters/timestamp_test.go b/lib/debezium/converters/timestamp_test.go index 9ca319a29..bc3cecb31 100644 --- a/lib/debezium/converters/timestamp_test.go +++ b/lib/debezium/converters/timestamp_test.go @@ -9,7 +9,13 @@ import ( ) func TestTimestamp_Converter(t *testing.T) { - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), Timestamp{}.ToKindDetails()) + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(t, err) + + kd, err := Timestamp{}.ToKindDetails() + assert.NoError(t, err) + + assert.Equal(t, _timestampTZ, kd) { // Invalid conversion _, err := Timestamp{}.Convert("invalid") @@ -30,7 +36,13 @@ func TestTimestamp_Converter(t *testing.T) { } func TestMicroTimestamp_Converter(t *testing.T) { - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), MicroTimestamp{}.ToKindDetails()) + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(t, err) + + kd, err := MicroTimestamp{}.ToKindDetails() + assert.NoError(t, err) + + assert.Equal(t, _timestampTZ, kd) { // Invalid conversion _, err := MicroTimestamp{}.Convert("invalid") @@ -51,7 +63,13 @@ func TestMicroTimestamp_Converter(t *testing.T) { } func TestNanoTimestamp_Converter(t *testing.T) { - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), NanoTimestamp{}.ToKindDetails()) + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(t, err) + + kd, err := NanoTimestamp{}.ToKindDetails() + assert.NoError(t, err) + + assert.Equal(t, _timestampTZ, kd) { // Invalid conversion _, err := NanoTimestamp{}.Convert("invalid") diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index 197f08b8f..65acb1d62 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -231,18 +231,37 @@ func TestField_ToKindDetails(t *testing.T) { { // Timestamp // Datetime (for now) - for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp, ZonedTimestamp} { - kd, err := Field{DebeziumType: dbzType}.ToKindDetails() - assert.NoError(t, err) - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), kd) + { + for _, dbzType := range []SupportedDebeziumType{MicroTimestamp, NanoTimestamp, ZonedTimestamp} { + kd, err := Field{DebeziumType: dbzType}.ToKindDetails() + assert.NoError(t, err) + + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(t, err) + assert.Equal(t, _timestampTZ, kd) + } + } + { + for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect} { + kd, err := Field{DebeziumType: dbzType}.ToKindDetails() + assert.NoError(t, err) + + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(t, err) + assert.Equal(t, _timestampTZ, kd) + } } + } { // Dates for _, dbzType := range []SupportedDebeziumType{Date, DateKafkaConnect} { kd, err := Field{DebeziumType: dbzType}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.DateKindType), kd) + + _date, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.DateKindType, "") + assert.NoError(t, err) + assert.Equal(t, _date, kd) } } { @@ -250,7 +269,10 @@ func TestField_ToKindDetails(t *testing.T) { for _, dbzType := range []SupportedDebeziumType{Time, TimeKafkaConnect, MicroTime, NanoTime, TimeWithTimezone} { kd, err := Field{DebeziumType: dbzType}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType), kd) + + _time, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType, "") + assert.NoError(t, err) + assert.Equal(t, _time, kd) } } { diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 57620c1b8..827a2c536 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -50,8 +50,11 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { } func (d *DDLTestSuite) TestAlterIdempotency() { + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(d.T(), err) + cols := []columns.Column{ - columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)), + columns.NewColumn("created_at", _timestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("order_name", typing.String), columns.NewColumn("start", typing.String), @@ -80,8 +83,11 @@ func (d *DDLTestSuite) TestAlterIdempotency() { func (d *DDLTestSuite) TestAlterTableAdd() { // Test adding a bunch of columns + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(d.T(), err) + cols := []columns.Column{ - columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)), + columns.NewColumn("created_at", _timestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("order_name", typing.String), columns.NewColumn("start", typing.String), @@ -122,8 +128,11 @@ func (d *DDLTestSuite) TestAlterTableAdd() { func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { // Test adding a bunch of columns + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(d.T(), err) + cols := []columns.Column{ - columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)), + columns.NewColumn("created_at", _timestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("name", typing.String), columns.NewColumn("start", typing.String), @@ -179,8 +188,11 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { func (d *DDLTestSuite) TestAlterTableDelete() { // Test adding a bunch of columns + _timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "") + assert.NoError(d.T(), err) + cols := []columns.Column{ - columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)), + columns.NewColumn("created_at", _timestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("name", typing.String), columns.NewColumn("col_to_delete", typing.String),