From 66ae321f0b2f1979774a53d95bd380334697f179 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 11:36:17 -0700 Subject: [PATCH 01/10] Clean up. --- clients/bigquery/converters/converters.go | 18 +++++++++++++++--- clients/bigquery/storagewrite.go | 2 +- lib/typing/ext/time.go | 5 ----- lib/typing/typing.go | 4 ++++ 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/clients/bigquery/converters/converters.go b/clients/bigquery/converters/converters.go index e071e49c7..efe9952e5 100644 --- a/clients/bigquery/converters/converters.go +++ b/clients/bigquery/converters/converters.go @@ -4,13 +4,21 @@ import ( "fmt" "strconv" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" ) -type StringConverter struct{} +type StringConverter struct { + kd typing.KindDetails +} + +func NewStringConverter(kd typing.KindDetails) StringConverter { + return StringConverter{kd: kd} +} -func (StringConverter) Convert(value any) (any, error) { +func (s StringConverter) Convert(value any) (any, error) { switch castedValue := value.(type) { case string: return castedValue, nil @@ -19,7 +27,11 @@ func (StringConverter) Convert(value any) (any, error) { case bool: return fmt.Sprint(castedValue), nil case *ext.ExtendedTime: - return castedValue.String(""), nil + if err := s.kd.EnsureExtendedTimeDetails(); err != nil { + return nil, err + } + + return castedValue.GetTime().Format(s.kd.ExtendedTimeDetails.Format), nil default: return nil, fmt.Errorf("expected string/*decimal.Decimal/bool received %T with value %v", value, value) } diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 63605a827..d6933eb98 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -158,7 +158,7 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto message.Set(field, protoreflect.ValueOfString(decimalValue.String())) case typing.String.Kind: - val, err := converters.StringConverter{}.Convert(value) + val, err := converters.NewStringConverter(column.KindDetails).Convert(value) if err != nil { return nil, err } diff --git a/lib/typing/ext/time.go b/lib/typing/ext/time.go index d84c2ea5b..93243eb41 100644 --- a/lib/typing/ext/time.go +++ b/lib/typing/ext/time.go @@ -81,8 +81,3 @@ func (e *ExtendedTime) GetTime() time.Time { func (e *ExtendedTime) GetNestedKind() NestedKind { return e.nestedKind } - -func (e *ExtendedTime) String(overrideFormat string) string { - format := cmp.Or(overrideFormat, e.nestedKind.Format) - return e.ts.Format(format) -} diff --git a/lib/typing/typing.go b/lib/typing/typing.go index 1441ddaf3..38c71af5d 100644 --- a/lib/typing/typing.go +++ b/lib/typing/typing.go @@ -33,6 +33,10 @@ func (k *KindDetails) EnsureExtendedTimeDetails() error { return fmt.Errorf("extended time details is not set") } + if k.ExtendedTimeDetails.Format == "" { + return fmt.Errorf("extended time details format is not set") + } + return nil } From d0259a22d509488b93b646fd990387e067dc7d8a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 11:41:14 -0700 Subject: [PATCH 02/10] WIP. --- clients/bigquery/converters/converters.go | 1 - .../bigquery/converters/converters_test.go | 19 +++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/clients/bigquery/converters/converters.go b/clients/bigquery/converters/converters.go index efe9952e5..b396b2f31 100644 --- a/clients/bigquery/converters/converters.go +++ b/clients/bigquery/converters/converters.go @@ -5,7 +5,6 @@ import ( "strconv" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" ) diff --git a/clients/bigquery/converters/converters_test.go b/clients/bigquery/converters/converters_test.go index 1d2cdf359..65e40d06d 100644 --- a/clients/bigquery/converters/converters_test.go +++ b/clients/bigquery/converters/converters_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/artie-labs/transfer/lib/typing" + "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/numbers" @@ -12,33 +14,38 @@ import ( ) func TestStringConverter_Convert(t *testing.T) { - converter := StringConverter{} { // String - val, err := converter.Convert("foo") + val, err := NewStringConverter(typing.String).Convert("foo") assert.NoError(t, err) assert.Equal(t, "foo", val) } { // Decimal - val, err := converter.Convert(decimal.NewDecimal(numbers.MustParseDecimal("123"))) + val, err := NewStringConverter(typing.EDecimal).Convert(decimal.NewDecimal(numbers.MustParseDecimal("123"))) assert.NoError(t, err) assert.Equal(t, "123", val) } { // Boolean - val, err := converter.Convert(true) + val, err := NewStringConverter(typing.Boolean).Convert(true) assert.NoError(t, err) assert.Equal(t, "true", val) } { // Invalid - _, err := converter.Convert(123) + _, err := NewStringConverter(typing.Integer).Convert(123) assert.ErrorContains(t, err, "expected string/*decimal.Decimal/bool received int with value 123") } { // Extended time - val, err := converter.Convert(ext.NewExtendedTime(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), ext.TimestampTZKindType, "")) + nestedKind, err := ext.NewNestedKind(ext.TimestampTZKindType, "") + assert.NoError(t, err) + + kd := typing.String + kd.ExtendedTimeDetails = &nestedKind + + val, err := NewStringConverter(kd).Convert(ext.NewExtendedTime(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), ext.TimestampTZKindType, "")) assert.NoError(t, err) assert.Equal(t, "2021-01-01T00:00:00Z", val) } From 75aff025b4ac64aae07a2eff703edeb8d840b42b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:17:23 -0700 Subject: [PATCH 03/10] Fix tests. --- clients/snowflake/snowflake_test.go | 4 ++-- clients/snowflake/staging.go | 2 ++ lib/typing/values/string_test.go | 3 +-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 720a404c1..06dfed4ad 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -224,7 +224,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, // Add kindDetails to created_at - "created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)), + "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), } var cols columns.Columns @@ -273,7 +273,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { inMemColumns := tableData.ReadOnlyInMemoryCols() // Since sflkColumns overwrote the format, let's set it correctly again. - inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)))) + inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, time.RFC3339Nano))) tableData.SetInMemoryColumns(inMemColumns) break } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index b71a912d3..9fd8c85cf 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -49,6 +49,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { value, err := values.ToString(colVal, colKind) if err != nil { + fmt.Println("colKind", colKind, "extendedTimeDetails", *colKind.ExtendedTimeDetails, "colVal", colVal, fmt.Sprintf("type: %T", colVal)) return "", err } @@ -124,6 +125,7 @@ func (s *Store) writeTemporaryTableFile(tableData *optimization.TableData, newTa for _, col := range columns { castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails) if castErr != nil { + fmt.Println("col.Name()", col.Name()) return "", castErr } diff --git a/lib/typing/values/string_test.go b/lib/typing/values/string_test.go index 5cc75878b..54738cd8d 100644 --- a/lib/typing/values/string_test.go +++ b/lib/typing/values/string_test.go @@ -33,8 +33,7 @@ func TestToString(t *testing.T) { assert.ErrorContains(t, err, "extended time details is not set") } { - eTimeCol := columns.NewColumn("time", typing.ETime) - eTimeCol.KindDetails.ExtendedTimeDetails = &ext.NestedKind{Type: ext.TimeKindType} + eTimeCol := columns.NewColumn("time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")) { // Using [string] val, err := ToString("2021-01-01T03:52:00Z", eTimeCol.KindDetails) From b4d2ca0889ac74774ca42b80f07a383cead34b4b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:18:37 -0700 Subject: [PATCH 04/10] Clean up. --- lib/optimization/table_data.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index bae801623..b4eae29ce 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -1,6 +1,7 @@ package optimization import ( + "cmp" "fmt" "strings" "time" @@ -273,7 +274,6 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro if found { inMemoryCol.KindDetails.Kind = foundColumn.KindDetails.Kind - // Copy over backfilled inMemoryCol.SetBackfilled(foundColumn.Backfilled()) @@ -300,8 +300,10 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro } } - // Just copy over the type since the format wouldn't be present in the destination + // Copy over the type inMemoryCol.KindDetails.ExtendedTimeDetails.Type = foundColumn.KindDetails.ExtendedTimeDetails.Type + // If the in-memory column has no format, we should use the format from the destination. + inMemoryCol.KindDetails.ExtendedTimeDetails.Format = cmp.Or(inMemoryCol.KindDetails.ExtendedTimeDetails.Format, foundColumn.KindDetails.ExtendedTimeDetails.Format) } From 952cf235305ced0e310ada2696d4b6a6de47aadf Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:19:31 -0700 Subject: [PATCH 05/10] Clean up. --- clients/snowflake/staging.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 9fd8c85cf..b71a912d3 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -49,7 +49,6 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { value, err := values.ToString(colVal, colKind) if err != nil { - fmt.Println("colKind", colKind, "extendedTimeDetails", *colKind.ExtendedTimeDetails, "colVal", colVal, fmt.Sprintf("type: %T", colVal)) return "", err } @@ -125,7 +124,6 @@ func (s *Store) writeTemporaryTableFile(tableData *optimization.TableData, newTa for _, col := range columns { castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails) if castErr != nil { - fmt.Println("col.Name()", col.Name()) return "", castErr } From 124de661ede1ae6afb9190cdba28440655fab50c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:20:39 -0700 Subject: [PATCH 06/10] Clean up. --- clients/snowflake/staging.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index b71a912d3..d8c768482 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -119,18 +119,18 @@ func (s *Store) writeTemporaryTableFile(tableData *optimization.TableData, newTa writer.Comma = '\t' columns := tableData.ReadOnlyInMemoryCols().ValidColumns() - for _, value := range tableData.Rows() { - var row []string + for _, row := range tableData.Rows() { + var csvRow []string for _, col := range columns { - castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails) + castedValue, castErr := castColValStaging(row[col.Name()], col.KindDetails) if castErr != nil { - return "", castErr + return "", fmt.Errorf("failed to cast value '%v': %w", row[col.Name()], castErr) } - row = append(row, castedValue) + csvRow = append(csvRow, castedValue) } - if err = writer.Write(row); err != nil { + if err = writer.Write(csvRow); err != nil { return "", fmt.Errorf("failed to write to csv: %w", err) } } From 4f13660b7a441f9eb5708dd8efa289f9dd6e1efe Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:21:08 -0700 Subject: [PATCH 07/10] Adding a TODO. --- lib/optimization/table_data.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index b4eae29ce..00be8988d 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -273,6 +273,7 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro } if found { + // TODO: Move this whole block into a function and add unit-tests. inMemoryCol.KindDetails.Kind = foundColumn.KindDetails.Kind // Copy over backfilled inMemoryCol.SetBackfilled(foundColumn.Backfilled()) From 9dcf5cfb9b40d43fc7f3df2b7118304ac6753c9f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:22:30 -0700 Subject: [PATCH 08/10] Clean up. --- clients/bigquery/converters/converters_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/bigquery/converters/converters_test.go b/clients/bigquery/converters/converters_test.go index 65e40d06d..e58527057 100644 --- a/clients/bigquery/converters/converters_test.go +++ b/clients/bigquery/converters/converters_test.go @@ -4,11 +4,10 @@ import ( "testing" "time" - "github.com/artie-labs/transfer/lib/typing" - "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/numbers" + "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" ) From 5dbbb76d579a7736fb9f234e0933c95ff4c54033 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:25:17 -0700 Subject: [PATCH 09/10] Clean up. --- clients/bigquery/converters/converters_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/bigquery/converters/converters_test.go b/clients/bigquery/converters/converters_test.go index e58527057..75c1d4863 100644 --- a/clients/bigquery/converters/converters_test.go +++ b/clients/bigquery/converters/converters_test.go @@ -38,13 +38,13 @@ func TestStringConverter_Convert(t *testing.T) { } { // Extended time - nestedKind, err := ext.NewNestedKind(ext.TimestampTZKindType, "") - assert.NoError(t, err) - - kd := typing.String - kd.ExtendedTimeDetails = &nestedKind - - val, err := NewStringConverter(kd).Convert(ext.NewExtendedTime(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), ext.TimestampTZKindType, "")) + val, err := NewStringConverter(typing.MustNewExtendedTimeDetails(typing.String, ext.TimestampTZKindType, "")).Convert( + ext.NewExtendedTime( + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + ext.TimestampTZKindType, + "", + ), + ) assert.NoError(t, err) assert.Equal(t, "2021-01-01T00:00:00Z", val) } From 0fc2686924e7b6db475fbfa3bf3047031201f0e2 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 23 Oct 2024 13:30:53 -0700 Subject: [PATCH 10/10] Fix test and update comment. --- lib/optimization/table_data_merge_columns_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/optimization/table_data_merge_columns_test.go b/lib/optimization/table_data_merge_columns_test.go index 37a51c3fa..ac621dc8c 100644 --- a/lib/optimization/table_data_merge_columns_test.go +++ b/lib/optimization/table_data_merge_columns_test.go @@ -66,8 +66,8 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { assert.True(t, isOk) assert.Equal(t, typing.ETime.Kind, updatedColumn.KindDetails.Kind) assert.Equal(t, ext.DateKindType, updatedColumn.KindDetails.ExtendedTimeDetails.Type) - // Format is not copied over. - assert.Equal(t, "", updatedColumn.KindDetails.ExtendedTimeDetails.Format) + // Format is copied over. + assert.Equal(t, ext.PostgresDateFormat, updatedColumn.KindDetails.ExtendedTimeDetails.Format) } { // In-memory column is NUMERIC and destination column is an INTEGER