From 104103061c2af62dfa8117de078919b27633f60d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 14:49:20 -0700 Subject: [PATCH 01/10] WIP. --- clients/bigquery/converters/converters.go | 2 ++ .../bigquery/converters/converters_test.go | 9 +------ clients/bigquery/dialect/dialect.go | 10 +++---- clients/bigquery/dialect/dialect_test.go | 4 +-- lib/typing/ext/parse.go | 13 +++++++++ lib/typing/parse.go | 2 +- lib/typing/typing.go | 4 +++ lib/typing/values/string.go | 8 ++++++ lib/typing/values/string_test.go | 27 ++++++++++++------- 9 files changed, 54 insertions(+), 25 deletions(-) diff --git a/clients/bigquery/converters/converters.go b/clients/bigquery/converters/converters.go index aa40ea674..2a52f9e34 100644 --- a/clients/bigquery/converters/converters.go +++ b/clients/bigquery/converters/converters.go @@ -32,6 +32,8 @@ func (s StringConverter) Convert(value any) (any, error) { return castedValue.Format(ext.PostgresDateFormat), nil case typing.TimestampNTZ: return castedValue.Format(ext.RFC3339NoTZ), nil + case typing.TimestampTZ: + return castedValue.Format(time.RFC3339), nil default: return nil, fmt.Errorf("unexpected kind details: %q", s.kd.Kind) } diff --git a/clients/bigquery/converters/converters_test.go b/clients/bigquery/converters/converters_test.go index 47fcff97b..7eea88c9e 100644 --- a/clients/bigquery/converters/converters_test.go +++ b/clients/bigquery/converters/converters_test.go @@ -9,7 +9,6 @@ import ( "github.com/artie-labs/transfer/lib/numbers" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" - "github.com/artie-labs/transfer/lib/typing/ext" ) func TestStringConverter_Convert(t *testing.T) { @@ -64,13 +63,7 @@ func TestStringConverter_Convert(t *testing.T) { } { // Extended time - val, err := NewStringConverter(typing.MustNewExtendedTimeDetails(typing.String, ext.TimestampTZKindType, "")).Convert( - ext.NewExtendedTime( - time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), - ext.TimestampTZKindType, - "", - ), - ) + val, err := NewStringConverter(typing.TimestampTZ).Convert(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)) assert.NoError(t, err) assert.Equal(t, "2021-01-01T00:00:00Z", val) } diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index a8e10c9dd..0013130bc 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -51,12 +51,12 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s return "date" case typing.TimestampNTZ.Kind: return "datetime" + case typing.TimestampTZ.Kind: + // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type + // We should be using TIMESTAMP since it's an absolute point in time. + return "timestamp" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { - case ext.TimestampTZKindType: - // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type - // We should be using TIMESTAMP since it's an absolute point in time. - return "timestamp" case ext.TimeKindType: return "time" } @@ -106,7 +106,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD case "array": return typing.Array, nil case "timestamp": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") + return typing.TimestampTZ, nil case "datetime": return typing.TimestampNTZ, nil case "time": diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index a13c86fdd..6d115f2e0 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -93,7 +93,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) { "json": typing.Struct, // Datetime "datetime": typing.TimestampNTZ, - "timestamp": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "timestamp": typing.TimestampTZ, "time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), "date": typing.Date, //Invalid @@ -130,7 +130,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) { func TestBigQueryDialect_KindForDataType_NoDataLoss(t *testing.T) { kindDetails := []typing.KindDetails{ - typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + typing.TimestampTZ, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), typing.Date, typing.String, diff --git a/lib/typing/ext/parse.go b/lib/typing/ext/parse.go index 600de6505..5737db0c5 100644 --- a/lib/typing/ext/parse.go +++ b/lib/typing/ext/parse.go @@ -46,6 +46,19 @@ func ParseTimestampNTZFromInterface(val any) (time.Time, error) { } } +func ParseTimestampTZFromInterface(val any) (time.Time, error) { + switch convertedVal := val.(type) { + case time.Time: + return convertedVal, nil + case *ExtendedTime: + return convertedVal.GetTime(), nil + case string: + return parseTimestampTZ(convertedVal) + default: + return time.Time{}, fmt.Errorf("unsupported type: %T", convertedVal) + } +} + func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, error) { switch convertedVal := val.(type) { case nil: diff --git a/lib/typing/parse.go b/lib/typing/parse.go index fd14c4205..17e99e5a4 100644 --- a/lib/typing/parse.go +++ b/lib/typing/parse.go @@ -50,7 +50,7 @@ func ParseValue(key string, optionalSchema map[string]KindDetails, val any) (Kin ExtendedDecimalDetails: &extendedDetails, }, nil case time.Time: - return NewExtendedTimeDetails(ETime, ext.TimestampTZKindType, "") + return TimestampTZ, nil case *ext.ExtendedTime: nestedKind := convertedVal.GetNestedKind() return KindDetails{ diff --git a/lib/typing/typing.go b/lib/typing/typing.go index 34034d9e3..642ced4c4 100644 --- a/lib/typing/typing.go +++ b/lib/typing/typing.go @@ -83,6 +83,10 @@ var ( Kind: "timestamp_ntz", } + TimestampTZ = KindDetails{ + Kind: "timestamp_tz", + } + ETime = KindDetails{ Kind: "extended_time", } diff --git a/lib/typing/values/string.go b/lib/typing/values/string.go index 0bb200f87..c49e44474 100644 --- a/lib/typing/values/string.go +++ b/lib/typing/values/string.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/stringutil" @@ -41,6 +42,13 @@ func ToString(colVal any, colKind typing.KindDetails) (string, error) { } return _time.Format(ext.RFC3339NoTZ), nil + case typing.TimestampTZ.Kind: + _time, err := ext.ParseTimestampTZFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err) + } + + return _time.Format(time.RFC3339Nano), nil case typing.ETime.Kind: if err := colKind.EnsureExtendedTimeDetails(); err != nil { return "", err diff --git a/lib/typing/values/string_test.go b/lib/typing/values/string_test.go index 2b5b9f142..8a6035817 100644 --- a/lib/typing/values/string_test.go +++ b/lib/typing/values/string_test.go @@ -55,6 +55,21 @@ func TestToString(t *testing.T) { assert.Equal(t, time.Date(2021, time.January, 1, 17, 33, 4, 150_001_123, time.UTC).Format(ext.RFC3339NoTZ), value) } } + { + // Timestamp TZ + { + // time.Time + value, err := ToString(time.Date(2019, time.December, 31, 1, 2, 33, 400_999_991, time.UTC), typing.TimestampTZ) + assert.NoError(t, err) + assert.Equal(t, "2019-12-31T01:02:33.400999991Z", value) + } + { + // String + value, err := ToString("2019-12-31T01:02:33.400999991Z", typing.TimestampTZ) + assert.NoError(t, err) + assert.Equal(t, time.Date(2019, time.December, 31, 1, 2, 33, 400_999_991, time.UTC).Format(time.RFC3339Nano), value) + } + } { // ETime { @@ -72,17 +87,11 @@ func TestToString(t *testing.T) { } { // Using [*ExtendedTime] - format := "2006-01-02T15:04:05Z07:00" - dustyBirthday := time.Date(2019, time.December, 31, 0, 0, 0, 0, time.UTC) - extendedTime := ext.NewExtendedTime(dustyBirthday, ext.TimestampTZKindType, format) - - nestedKind, err := ext.NewNestedKind(ext.TimestampTZKindType, format) - assert.NoError(t, err) - - eTimeCol.KindDetails.ExtendedTimeDetails = &nestedKind + dustyBirthday := time.Date(2019, time.December, 31, 9, 27, 22, 0, time.UTC) + extendedTime := ext.NewExtendedTime(dustyBirthday, ext.TimeKindType, "") actualValue, err := ToString(extendedTime, eTimeCol.KindDetails) assert.NoError(t, err) - assert.Equal(t, extendedTime.GetTime().Format(format), actualValue) + assert.Equal(t, "09:27:22", actualValue) } } } From 8bff560cc857674aac9cde6dfe334fbb6a3050be Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 14:49:31 -0700 Subject: [PATCH 02/10] WIP. --- clients/bigquery/converters/converters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/bigquery/converters/converters.go b/clients/bigquery/converters/converters.go index 2a52f9e34..852970db7 100644 --- a/clients/bigquery/converters/converters.go +++ b/clients/bigquery/converters/converters.go @@ -33,7 +33,7 @@ func (s StringConverter) Convert(value any) (any, error) { case typing.TimestampNTZ: return castedValue.Format(ext.RFC3339NoTZ), nil case typing.TimestampTZ: - return castedValue.Format(time.RFC3339), nil + return castedValue.Format(time.RFC3339Nano), nil default: return nil, fmt.Errorf("unexpected kind details: %q", s.kd.Kind) } From 67944d14c1a17b5e9420bd5d860c1d41be8c48db Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 14:58:07 -0700 Subject: [PATCH 03/10] More. --- clients/bigquery/storagewrite.go | 22 ++++---- clients/bigquery/storagewrite_test.go | 14 +++-- clients/databricks/dialect/typing.go | 6 +-- lib/typing/ext/parse.go | 2 - lib/typing/ext/parse_test.go | 76 +++++++++++---------------- lib/typing/ext/time.go | 2 - lib/typing/ext/time_test.go | 35 ------------ 7 files changed, 55 insertions(+), 102 deletions(-) delete mode 100644 lib/typing/ext/time_test.go diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 715b8782f..09fdc589a 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -8,10 +8,6 @@ import ( "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/bigquery/storage/managedwriter/adapt" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/types/dynamicpb" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/artie-labs/transfer/clients/bigquery/converters" "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" @@ -19,6 +15,8 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/dynamicpb" ) // columnToTableFieldSchema returns a [*storagepb.TableFieldSchema] suitable for transferring data of the type that the column specifies. @@ -42,12 +40,12 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem fieldType = storagepb.TableFieldSchema_DATE case typing.TimestampNTZ.Kind: fieldType = storagepb.TableFieldSchema_DATETIME + case typing.TimestampTZ.Kind: + fieldType = storagepb.TableFieldSchema_TIMESTAMP case typing.ETime.Kind: switch column.KindDetails.ExtendedTimeDetails.Type { case ext.TimeKindType: fieldType = storagepb.TableFieldSchema_TIME - case ext.TimestampTZKindType: - fieldType = storagepb.TableFieldSchema_TIMESTAMP default: return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type) } @@ -216,6 +214,13 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto } message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time))) + case typing.TimestampTZ.Kind: + _time, err := ext.ParseTimestampTZFromInterface(value) + if err != nil { + return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err) + } + + message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro())) case typing.ETime.Kind: if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil { return nil, err @@ -229,11 +234,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto switch column.KindDetails.ExtendedTimeDetails.Type { case ext.TimeKindType: message.Set(field, protoreflect.ValueOfInt64(encodePacked64TimeMicros(_time))) - case ext.TimestampTZKindType: - if err = timestamppb.New(_time).CheckValid(); err != nil { - return nil, err - } - message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro())) default: return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type) } diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go index b17e192b8..5db3b56dd 100644 --- a/clients/bigquery/storagewrite_test.go +++ b/clients/bigquery/storagewrite_test.go @@ -55,8 +55,14 @@ func TestColumnToTableFieldSchema(t *testing.T) { assert.Equal(t, storagepb.TableFieldSchema_DATE, fieldSchema.Type) } { - // ETime - TimestampTZ: - fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))) + // Datetime (TimestampNTZ) + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.TimestampNTZ)) + assert.NoError(t, err) + assert.Equal(t, storagepb.TableFieldSchema_DATETIME, fieldSchema.Type) + } + { + // Timestamp (TimestampTZ) + fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.TimestampTZ)) assert.NoError(t, err) assert.Equal(t, storagepb.TableFieldSchema_TIMESTAMP, fieldSchema.Type) } @@ -168,7 +174,7 @@ func TestRowToMessage(t *testing.T) { columns.NewColumn("c_string", typing.String), columns.NewColumn("c_string_decimal", typing.String), columns.NewColumn("c_time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")), - columns.NewColumn("c_timestamp", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), + columns.NewColumn("c_timestamp", typing.TimestampTZ), columns.NewColumn("c_date", typing.Date), columns.NewColumn("c_datetime", typing.TimestampNTZ), columns.NewColumn("c_struct", typing.Struct), @@ -189,7 +195,7 @@ func TestRowToMessage(t *testing.T) { "c_string": "foo bar", "c_string_decimal": decimal.NewDecimal(numbers.MustParseDecimal("1.61803")), "c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""), - "c_timestamp": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""), + "c_timestamp": time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), "c_date": time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), "c_datetime": time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), "c_struct": map[string]any{"baz": []string{"foo", "bar"}}, diff --git a/clients/databricks/dialect/typing.go b/clients/databricks/dialect/typing.go index cf451f86a..e28aebb99 100644 --- a/clients/databricks/dialect/typing.go +++ b/clients/databricks/dialect/typing.go @@ -30,10 +30,10 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) // This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables. // Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html return "TIMESTAMP_NTZ" + case typing.TimestampTZ.Kind: + return "TIMESTAMP" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { - case ext.TimestampTZKindType: - return "TIMESTAMP" case ext.TimeKindType: return "STRING" } @@ -74,7 +74,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD case "smallint", "tinyint": return typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.SmallIntegerKind)}, nil case "timestamp": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") + return typing.TimestampTZ, nil case "timestamp_ntz": return typing.TimestampNTZ, nil } diff --git a/lib/typing/ext/parse.go b/lib/typing/ext/parse.go index 5737db0c5..f70233582 100644 --- a/lib/typing/ext/parse.go +++ b/lib/typing/ext/parse.go @@ -81,8 +81,6 @@ func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, erro func ParseDateTime(value string, kindType ExtendedTimeKindType) (time.Time, error) { switch kindType { - case TimestampTZKindType: - return parseTimestampTZ(value) case TimeKindType: // Try time first if ts, err := parseTime(value); err == nil { diff --git a/lib/typing/ext/parse_test.go b/lib/typing/ext/parse_test.go index 54f86ddc0..4bb515c68 100644 --- a/lib/typing/ext/parse_test.go +++ b/lib/typing/ext/parse_test.go @@ -11,83 +11,69 @@ func TestParseFromInterface(t *testing.T) { { // Extended time var vals []*ExtendedTime - vals = append(vals, NewExtendedTime(time.Now().UTC(), TimestampTZKindType, ISO8601)) vals = append(vals, NewExtendedTime(time.Now().UTC(), TimeKindType, PostgresTimeFormat)) for _, val := range vals { - _time, err := ParseFromInterface(val, TimestampTZKindType) + _time, err := ParseFromInterface(val, TimeKindType) assert.NoError(t, err) assert.Equal(t, val.GetTime(), _time) } } +} + +func TestParseFromInterfaceTime(t *testing.T) { + now := time.Now() + for _, supportedTimeFormat := range SupportedTimeFormats { + _time, err := ParseFromInterface(now.Format(supportedTimeFormat), TimeKindType) + assert.NoError(t, err) + assert.Equal(t, _time.Format(supportedTimeFormat), now.Format(supportedTimeFormat)) + } +} + +func TestParseDateFromInterface(t *testing.T) { + now := time.Now() + for _, supportedDateFormat := range supportedDateFormats { + _time, err := ParseDateFromInterface(now.Format(supportedDateFormat)) + assert.NoError(t, err) + assert.Equal(t, _time.Format(supportedDateFormat), now.Format(supportedDateFormat)) + } +} + +func TestParseTimestampTZFromInterface(t *testing.T) { { // Nil - _, err := ParseFromInterface(nil, TimestampTZKindType) + _, err := ParseTimestampTZFromInterface(nil) assert.ErrorContains(t, err, "val is nil") } { // True - _, err := ParseFromInterface(true, TimestampTZKindType) + _, err := ParseTimestampTZFromInterface(true) assert.ErrorContains(t, err, "failed to parse colVal, expected type string or *ExtendedTime and got: bool") } { // False - _, err := ParseFromInterface(false, TimestampTZKindType) + _, err := ParseTimestampTZFromInterface(false) assert.ErrorContains(t, err, "failed to parse colVal, expected type string or *ExtendedTime and got: bool") } { // String - RFC3339MillisecondUTC - value, err := ParseFromInterface("2024-09-19T16:05:18.630Z", TimestampTZKindType) + value, err := ParseTimestampTZFromInterface("2024-09-19T16:05:18.630Z") assert.NoError(t, err) - assert.Equal(t, "2024-09-19T16:05:18.630Z", value.Format(RFC3339Millisecond)) + assert.Equal(t, "2024-09-19T16:05:18.630Z", value.Format(time.RFC3339Nano)) } { // String - RFC3339MicrosecondUTC - value, err := ParseFromInterface("2024-09-19T16:05:18.630000Z", TimestampTZKindType) + value, err := ParseTimestampTZFromInterface("2024-09-19T16:05:18.630001Z") assert.NoError(t, err) - assert.Equal(t, "2024-09-19T16:05:18.630000Z", value.Format(RFC3339Microsecond)) + assert.Equal(t, "2024-09-19T16:05:18.630001Z", value.Format(time.RFC3339Nano)) } { // String - RFC3339NanosecondUTC - value, err := ParseFromInterface("2024-09-19T16:05:18.630000000Z", TimestampTZKindType) - assert.NoError(t, err) - assert.Equal(t, "2024-09-19T16:05:18.630000000Z", value.Format(RFC3339Nanosecond)) - } -} - -func TestParseFromInterfaceDateTime(t *testing.T) { - now := time.Now().In(time.UTC) - for _, supportedDateTimeLayout := range supportedDateTimeLayouts { - _time, err := ParseFromInterface(now.Format(supportedDateTimeLayout), TimestampTZKindType) - assert.NoError(t, err) - assert.Equal(t, _time.Format(supportedDateTimeLayout), now.Format(supportedDateTimeLayout)) - } -} - -func TestParseFromInterfaceTime(t *testing.T) { - now := time.Now() - for _, supportedTimeFormat := range SupportedTimeFormats { - _time, err := ParseFromInterface(now.Format(supportedTimeFormat), TimeKindType) - assert.NoError(t, err) - assert.Equal(t, _time.Format(supportedTimeFormat), now.Format(supportedTimeFormat)) - } -} - -func TestParseDateFromInterface(t *testing.T) { - now := time.Now() - for _, supportedDateFormat := range supportedDateFormats { - _time, err := ParseDateFromInterface(now.Format(supportedDateFormat)) + value, err := ParseTimestampTZFromInterface("2024-09-19T16:05:18.630000002Z") assert.NoError(t, err) - assert.Equal(t, _time.Format(supportedDateFormat), now.Format(supportedDateFormat)) + assert.Equal(t, "2024-09-19T16:05:18.630000002Z", value.Format(time.RFC3339Nano)) } } -func TestParseExtendedDateTime_TimestampTZ(t *testing.T) { - tsString := "2023-04-24T17:29:05.69944Z" - extTime, err := ParseDateTime(tsString, TimestampTZKindType) - assert.NoError(t, err) - assert.Equal(t, tsString, extTime.Format(time.RFC3339Nano)) -} - func TestParseTimestampNTZFromInterface(t *testing.T) { { // No fractional seconds diff --git a/lib/typing/ext/time.go b/lib/typing/ext/time.go index 7c0b0d99e..37f9bab51 100644 --- a/lib/typing/ext/time.go +++ b/lib/typing/ext/time.go @@ -16,8 +16,6 @@ const ( func (e ExtendedTimeKindType) defaultLayout() (string, error) { switch e { - case TimestampTZKindType: - return time.RFC3339Nano, nil case TimeKindType: return PostgresTimeFormat, nil default: diff --git a/lib/typing/ext/time_test.go b/lib/typing/ext/time_test.go deleted file mode 100644 index 4963ed24a..000000000 --- a/lib/typing/ext/time_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package ext - -import ( - "encoding/json" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestExtendedTime_MarshalJSON(t *testing.T) { - extTime := NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123456000, time.UTC), TimestampTZKindType, RFC3339Millisecond) - - { - // Single value - bytes, err := json.Marshal(extTime) - assert.NoError(t, err) - assert.Equal(t, `"2025-09-13T00:00:00.123456Z"`, string(bytes)) - } - { - // As a nested object - type Object struct { - ExtendedTime *ExtendedTime `json:"extendedTime"` - Foo string `json:"foo"` - } - - var obj Object - obj.ExtendedTime = extTime - obj.Foo = "bar" - - bytes, err := json.Marshal(obj) - assert.NoError(t, err) - assert.Equal(t, `{"extendedTime":"2025-09-13T00:00:00.123456Z","foo":"bar"}`, string(bytes)) - } -} From 7ea804ac8c76cb4ba9cbe326bbeb6b8d44e0c909 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:03:12 -0700 Subject: [PATCH 04/10] WIP. --- clients/databricks/dialect/typing_test.go | 6 +++--- clients/mssql/dialect/dialect.go | 6 +++--- clients/redshift/dialect/typing.go | 6 +++--- lib/debezium/converters/time.go | 10 +++------- lib/typing/columns/diff_test.go | 5 ++--- 5 files changed, 14 insertions(+), 19 deletions(-) diff --git a/clients/databricks/dialect/typing_test.go b/clients/databricks/dialect/typing_test.go index 8d43c20ac..21c4e731a 100644 --- a/clients/databricks/dialect/typing_test.go +++ b/clients/databricks/dialect/typing_test.go @@ -42,11 +42,11 @@ func TestDatabricksDialect_DataTypeForKind(t *testing.T) { } { // Timestamp - assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.ETime.Kind, ExtendedTimeDetails: &ext.NestedKind{Type: ext.TimestampTZKindType}}, false)) + assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.TimestampTZ, false)) } { // Timestamp (w/o timezone) - assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.ETime.Kind, ExtendedTimeDetails: &ext.NestedKind{Type: ext.TimestampTZKindType}}, false)) + assert.Equal(t, "TIMESTAMP_NTZ", DatabricksDialect{}.DataTypeForKind(typing.TimestampNTZ, false)) } { // Time @@ -145,7 +145,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) { // Timestamp kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP", "") assert.NoError(t, err) - assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), kd) + assert.Equal(t, typing.TimestampTZ, kd) } { // Timestamp NTZ diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 22f163152..9f3df5f5d 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -58,10 +58,10 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s case typing.TimestampNTZ.Kind: // Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088 return "datetime2" + case typing.TimestampTZ.Kind: + return "datetimeoffset" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { - case ext.TimestampTZKindType: - return "datetimeoffset" case ext.TimeKindType: return "time" } @@ -118,7 +118,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ "datetime2": return typing.TimestampNTZ, nil case "datetimeoffset": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") + return typing.TimestampTZ, nil case "time": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "") case "date": diff --git a/clients/redshift/dialect/typing.go b/clients/redshift/dialect/typing.go index b4d3de6be..f9f0326fc 100644 --- a/clients/redshift/dialect/typing.go +++ b/clients/redshift/dialect/typing.go @@ -49,10 +49,10 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string { return "DATE" case typing.TimestampNTZ.Kind: return "TIMESTAMP WITHOUT TIME ZONE" + case typing.TimestampTZ.Kind: + return "TIMESTAMP WITH TIME ZONE" case typing.ETime.Kind: switch kd.ExtendedTimeDetails.Type { - case ext.TimestampTZKindType: - return "timestamp with time zone" case ext.TimeKindType: return "time" } @@ -108,7 +108,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) ( case "timestamp", "timestamp without time zone": return typing.TimestampNTZ, nil case "timestamp with time zone": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") + return typing.TimestampTZ, nil case "time without time zone": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "") case "date": diff --git a/lib/debezium/converters/time.go b/lib/debezium/converters/time.go index 6c2bba4ab..97b021853 100644 --- a/lib/debezium/converters/time.go +++ b/lib/debezium/converters/time.go @@ -71,12 +71,8 @@ func (m MicroTime) Convert(value any) (any, error) { type ZonedTimestamp struct{} -func (ZonedTimestamp) layout() string { - return time.RFC3339Nano -} - func (z ZonedTimestamp) ToKindDetails() (typing.KindDetails, error) { - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, z.layout()) + return typing.TimestampTZ, nil } func (z ZonedTimestamp) Convert(value any) (any, error) { @@ -97,12 +93,12 @@ func (z ZonedTimestamp) Convert(value any) (any, error) { } } - _time, err := time.Parse(z.layout(), valString) + _time, err := time.Parse(time.RFC3339Nano, valString) if err != nil { return nil, fmt.Errorf("failed to parse %q: %w", valString, err) } - return ext.NewExtendedTime(_time, ext.TimestampTZKindType, z.layout()), nil + return _time, nil } type TimeWithTimezone struct{} diff --git a/lib/typing/columns/diff_test.go b/lib/typing/columns/diff_test.go index 46c76024a..79b7fafad 100644 --- a/lib/typing/columns/diff_test.go +++ b/lib/typing/columns/diff_test.go @@ -9,7 +9,6 @@ import ( "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" ) @@ -249,8 +248,8 @@ func TestDiffDeterministic(t *testing.T) { func TestCopyColMap(t *testing.T) { var cols Columns cols.AddColumn(NewColumn("hello", typing.String)) - cols.AddColumn(NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))) - cols.AddColumn(NewColumn("updated_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))) + cols.AddColumn(NewColumn("created_at", typing.TimestampTZ)) + cols.AddColumn(NewColumn("updated_at", typing.TimestampTZ)) copiedCols := CloneColumns(&cols) assert.Equal(t, copiedCols, &cols) From 87bf5a6389b2b1599dec2c94983fc632ddcb8142 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:14:23 -0700 Subject: [PATCH 05/10] WIP. --- clients/mssql/dialect/dialect_test.go | 2 +- clients/snowflake/ddl_test.go | 7 ++- clients/snowflake/dialect/dialect.go | 6 +-- lib/cdc/util/relational_event.go | 5 +- lib/debezium/converters/time_test.go | 66 ++++++++++----------------- lib/optimization/table_data_test.go | 5 +- lib/parquetutil/parse_values.go | 7 +++ lib/parquetutil/parse_values_test.go | 7 +-- 8 files changed, 42 insertions(+), 63 deletions(-) diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index d9ffd0c8e..1b0c7b33b 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -66,7 +66,7 @@ func TestMSSQLDialect_KindForDataType(t *testing.T) { "time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), "datetime": typing.TimestampNTZ, "datetime2": typing.TimestampNTZ, - "datetimeoffset": typing.ETime, + "datetimeoffset": typing.TimestampTZ, } for col, expectedKind := range colToExpectedKind { diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index 62fff39d7..bd8c8c16c 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -13,7 +13,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" @@ -29,7 +28,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { "customer_id": typing.Integer, "price": typing.Float, "name": typing.String, - "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "created_at": typing.TimestampTZ, } { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } @@ -58,7 +57,7 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { "customer_id": typing.Integer, "price": typing.Float, "name": typing.String, - "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "created_at": typing.TimestampTZ, } { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } @@ -97,7 +96,7 @@ func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() { "customer_id": typing.Integer, "price": typing.Float, "name": typing.String, - "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "created_at": typing.TimestampTZ, } { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index f9985ee21..b2ec3f4af 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -34,10 +34,10 @@ func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) return "date" case typing.TimestampNTZ.Kind: return "timestamp_ntz" + case typing.TimestampTZ.Kind: + return "timestamp_tz" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { - case ext.TimestampTZKindType: - return "timestamp_tz" case ext.TimeKindType: return "time" } @@ -98,7 +98,7 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing. case "array": return typing.Array, nil case "timestamp_ltz", "timestamp_tz": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") + return typing.TimestampTZ, nil case "timestamp", "datetime", "timestamp_ntz": return typing.TimestampNTZ, nil case "time": diff --git a/lib/cdc/util/relational_event.go b/lib/cdc/util/relational_event.go index 956e92f47..01cc01583 100644 --- a/lib/cdc/util/relational_event.go +++ b/lib/cdc/util/relational_event.go @@ -9,7 +9,6 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" ) // SchemaEventPayload is our struct for an event with schema enabled. For reference, this is an example payload https://gist.github.com/Tang8330/3b9989ed8c659771958fe481f248397a @@ -113,11 +112,11 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf } if tc.IncludeArtieUpdatedAt { - retMap[constants.UpdateColumnMarker] = ext.NewExtendedTime(time.Now().UTC(), ext.TimestampTZKindType, ext.ISO8601) + retMap[constants.UpdateColumnMarker] = time.Now().UTC() } if tc.IncludeDatabaseUpdatedAt { - retMap[constants.DatabaseUpdatedColumnMarker] = ext.NewExtendedTime(s.GetExecutionTime(), ext.TimestampTZKindType, ext.ISO8601) + retMap[constants.DatabaseUpdatedColumnMarker] = s.GetExecutionTime() } return retMap, nil diff --git a/lib/debezium/converters/time_test.go b/lib/debezium/converters/time_test.go index 7ec983f96..f5878e21b 100644 --- a/lib/debezium/converters/time_test.go +++ b/lib/debezium/converters/time_test.go @@ -34,28 +34,22 @@ func TestZonedTimestamp_Convert(t *testing.T) { // No fractional seconds val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:12Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 12, 000000000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:12Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 12, 000000000, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:12Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 1 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.1Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 100000000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.1Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 100000000, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.1Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 2 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.12Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 120000000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.12Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 120000000, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.12Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 3 digits @@ -64,71 +58,57 @@ func TestZonedTimestamp_Convert(t *testing.T) { expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123000000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.123Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, "2025-09-13T00:00:00.123Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 4 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.1234Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123400000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.1234Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123400000, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.1234Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 5 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.12345Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123450000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.12345Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123450000, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.12345Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 6 digits (microseconds) val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.123456Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123456000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.123456Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123456000, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.123456Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 7 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.1234567Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123456700, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.1234567Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123456700, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.1234567Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 8 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.12345678Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123456780, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.12345678Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123456780, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.12345678Z", val.(time.Time).Format(time.RFC3339Nano)) } { // 9 digits (nanoseconds) val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.123456789Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123456789, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, "2025-09-13T00:00:00.123456789Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123456789, time.UTC), val.(time.Time)) + assert.Equal(t, "2025-09-13T00:00:00.123456789Z", val.(time.Time).Format(time.RFC3339Nano)) } { // Testing timezone offset - ts := "2025-09-13T00:00:00.123456789+07:00" - val, err := ZonedTimestamp{}.Convert(ts) + valueString := "2025-09-13T00:00:00.123456789+07:00" + val, err := ZonedTimestamp{}.Convert(valueString) assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123456789, time.FixedZone("", 7*60*60)), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) - assert.Equal(t, ts, val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout())) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123456789, time.FixedZone("", 7*60*60)), val.(time.Time)) + assert.Equal(t, valueString, val.(time.Time).Format(time.RFC3339Nano)) } } } diff --git a/lib/optimization/table_data_test.go b/lib/optimization/table_data_test.go index 278c4a1b3..d3e3f17fc 100644 --- a/lib/optimization/table_data_test.go +++ b/lib/optimization/table_data_test.go @@ -375,10 +375,9 @@ func TestMergeColumn(t *testing.T) { // Testing for backwards compatibility // in-memory column is TimestampNTZ, destination column is TimestampTZ timestampNTZColumn := columns.NewColumn("foo", typing.TimestampNTZ) - timestampTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")) + timestampTZColumn := columns.NewColumn("foo", typing.TimestampTZ) col := mergeColumn(timestampNTZColumn, timestampTZColumn) - assert.Equal(t, ext.TimestampTZKindType, col.KindDetails.ExtendedTimeDetails.Type) - assert.Equal(t, "2006-01-02T15:04:05.999999999Z07:00", col.KindDetails.ExtendedTimeDetails.Format) + assert.Equal(t, typing.TimestampTZ, col.KindDetails) } { // Copy the dest column format if in-mem column format is empty. diff --git a/lib/parquetutil/parse_values.go b/lib/parquetutil/parse_values.go index 53f041dc5..88ed55720 100644 --- a/lib/parquetutil/parse_values.go +++ b/lib/parquetutil/parse_values.go @@ -33,6 +33,13 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) { return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err) } + return _time.UnixMilli(), nil + case typing.TimestampTZ.Kind: + _time, err := ext.ParseTimestampTZFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err) + } + return _time.UnixMilli(), nil case typing.ETime.Kind: if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil { diff --git a/lib/parquetutil/parse_values_test.go b/lib/parquetutil/parse_values_test.go index fd5e20124..9cdf8416f 100644 --- a/lib/parquetutil/parse_values_test.go +++ b/lib/parquetutil/parse_values_test.go @@ -74,12 +74,7 @@ func TestParseValue(t *testing.T) { } { // Timestamp TZ - eDateTime := typing.ETime - nestedKind, err := ext.NewNestedKind(ext.TimestampTZKindType, "") - assert.NoError(t, err) - - eDateTime.ExtendedTimeDetails = &nestedKind - value, err := ParseValue("2023-04-24T17:29:05.69944Z", columns.NewColumn("", eDateTime)) + value, err := ParseValue("2023-04-24T17:29:05.69944Z", columns.NewColumn("", typing.TimestampTZ)) assert.NoError(t, err) assert.Equal(t, int64(1682357345699), value) } From ab7d8402b0109915f8cb6152aa3ab3ac17b37696 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:31:12 -0700 Subject: [PATCH 06/10] Fix tests. --- clients/redshift/dialect/typing_test.go | 7 ++-- clients/shared/default_value.go | 8 ++++ clients/shared/default_value_test.go | 19 +++------- clients/snowflake/dialect/dialect_test.go | 4 +- clients/snowflake/snowflake_test.go | 7 ++-- lib/cdc/relational/debezium_test.go | 11 ++---- lib/debezium/converters/time_test.go | 4 +- lib/debezium/schema_test.go | 3 +- lib/debezium/types_test.go | 5 +-- lib/destination/ddl/ddl_sflk_test.go | 9 ++--- .../table_data_merge_columns_test.go | 28 +++++++------- lib/optimization/table_data_test.go | 38 ++++--------------- lib/typing/ext/parse.go | 2 + lib/typing/ext/parse_test.go | 25 ++++++++---- lib/typing/ext/time.go | 3 +- 15 files changed, 75 insertions(+), 98 deletions(-) diff --git a/clients/redshift/dialect/typing_test.go b/clients/redshift/dialect/typing_test.go index e1f78b623..9a3176dbb 100644 --- a/clients/redshift/dialect/typing_test.go +++ b/clients/redshift/dialect/typing_test.go @@ -47,7 +47,7 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) { // Timestamps { // With timezone - assert.Equal(t, "timestamp with time zone", RedshiftDialect{}.DataTypeForKind(typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), false)) + assert.Equal(t, "TIMESTAMP WITH TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampTZ, false)) } { // Without timezone @@ -132,12 +132,13 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) { { // Times { + // TimestampTZ kd, err := dialect.KindForDataType("timestamp with time zone", "") assert.NoError(t, err) - assert.Equal(t, typing.ETime.Kind, kd.Kind) - assert.Equal(t, ext.TimestampTZKindType, kd.ExtendedTimeDetails.Type) + assert.Equal(t, typing.TimestampTZ, kd) } { + // TimestampNTZ kd, err := dialect.KindForDataType("timestamp without time zone", "") assert.NoError(t, err) assert.Equal(t, typing.TimestampNTZ, kd) diff --git a/clients/shared/default_value.go b/clients/shared/default_value.go index 4036f95f8..b8af4298b 100644 --- a/clients/shared/default_value.go +++ b/clients/shared/default_value.go @@ -3,6 +3,7 @@ package shared import ( "fmt" "log/slog" + "time" bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/destination" @@ -35,6 +36,13 @@ func DefaultValue(column columns.Column, dialect sql.Dialect) (any, error) { } return sql.QuoteLiteral(_time.Format(ext.RFC3339NoTZ)), nil + case typing.TimestampTZ.Kind: + _time, err := ext.ParseTimestampTZFromInterface(column.DefaultValue()) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", column.DefaultValue(), err) + } + + return sql.QuoteLiteral(_time.Format(time.RFC3339Nano)), nil case typing.ETime.Kind: if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil { return nil, err diff --git a/clients/shared/default_value_test.go b/clients/shared/default_value_test.go index 35fd91977..08aebc57b 100644 --- a/clients/shared/default_value_test.go +++ b/clients/shared/default_value_test.go @@ -25,21 +25,12 @@ var dialects = []sql.Dialect{ func TestColumn_DefaultValue(t *testing.T) { birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC) - birthdayDateTime, err := ext.ParseDateTime(birthday.Format(ext.ISO8601), ext.TimestampTZKindType) - assert.NoError(t, err) // time timeKind := typing.ETime timeNestedKind, err := ext.NewNestedKind(ext.TimeKindType, "") assert.NoError(t, err) timeKind.ExtendedTimeDetails = &timeNestedKind - - // date time - dateTimeKind := typing.ETime - dateTimeNestedKind, err := ext.NewNestedKind(ext.TimestampTZKindType, "") - assert.NoError(t, err) - dateTimeKind.ExtendedTimeDetails = &dateTimeNestedKind - testCases := []struct { name string col columns.Column @@ -79,22 +70,22 @@ func TestColumn_DefaultValue(t *testing.T) { }, { name: "date", - col: columns.NewColumnWithDefaultValue("", typing.Date, birthdayDateTime), + col: columns.NewColumnWithDefaultValue("", typing.Date, birthday), expectedValue: "'2022-09-06'", }, { name: "timestamp_ntz", - col: columns.NewColumnWithDefaultValue("", typing.TimestampNTZ, birthdayDateTime), + col: columns.NewColumnWithDefaultValue("", typing.TimestampNTZ, birthday), expectedValue: "'2022-09-06T03:19:24.942'", }, { name: "time", - col: columns.NewColumnWithDefaultValue("", timeKind, birthdayDateTime), + col: columns.NewColumnWithDefaultValue("", timeKind, birthday), expectedValue: "'03:19:24.942'", }, { - name: "datetime", - col: columns.NewColumnWithDefaultValue("", dateTimeKind, birthdayDateTime), + name: "timestamp_tz", + col: columns.NewColumnWithDefaultValue("", typing.TimestampTZ, birthday), expectedValue: "'2022-09-06T03:19:24.942Z'", }, } diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 93d7491a9..39aa7815a 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -180,7 +180,7 @@ func TestSnowflakeDialect_KindForDataType_DateTime(t *testing.T) { for _, expectedDateTime := range expectedDateTimes { kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "") assert.NoError(t, err) - assert.Equal(t, ext.TimestampTZKindType, kd.ExtendedTimeDetails.Type, expectedDateTime) + assert.Equal(t, typing.TimestampTZ, kd, expectedDateTime) } } { @@ -196,7 +196,7 @@ func TestSnowflakeDialect_KindForDataType_DateTime(t *testing.T) { func TestSnowflakeDialect_KindForDataType_NoDataLoss(t *testing.T) { kindDetails := []typing.KindDetails{ - typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + typing.TimestampTZ, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), typing.Date, typing.String, diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 06dfed4ad..34b966fef 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -20,7 +20,6 @@ import ( "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" ) func (s *SnowflakeTestSuite) identifierFor(tableData *optimization.TableData) sql.TableIdentifier { @@ -224,7 +223,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, // Add kindDetails to created_at - "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "created_at": typing.TimestampTZ, } var cols columns.Columns @@ -240,7 +239,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { snowflakeColToKindDetailsMap := map[string]typing.KindDetails{ "id": typing.Integer, - "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "created_at": typing.TimestampTZ, "name": typing.String, constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, @@ -273,7 +272,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { inMemColumns := tableData.ReadOnlyInMemoryCols() // Since sflkColumns overwrote the format, let's set it correctly again. - inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, time.RFC3339Nano))) + inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.TimestampTZ)) tableData.SetInMemoryColumns(inMemColumns) break } diff --git a/lib/cdc/relational/debezium_test.go b/lib/cdc/relational/debezium_test.go index 34092f740..732ad0352 100644 --- a/lib/cdc/relational/debezium_test.go +++ b/lib/cdc/relational/debezium_test.go @@ -9,8 +9,6 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/ext" - "github.com/stretchr/testify/assert" ) @@ -90,7 +88,7 @@ func (r *RelationTestSuite) TestPostgresEvent() { evtData, err := evt.GetData(map[string]any{"id": 59}, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true}) assert.NoError(r.T(), err) assert.Equal(r.T(), float64(59), evtData["id"]) - assert.Equal(r.T(), ext.NewExtendedTime(time.Date(2022, time.November, 16, 4, 1, 53, 308000000, time.UTC), ext.TimestampTZKindType, ext.ISO8601), evtData[constants.DatabaseUpdatedColumnMarker]) + assert.Equal(r.T(), time.Date(2022, time.November, 16, 4, 1, 53, 308000000, time.UTC), evtData[constants.DatabaseUpdatedColumnMarker]) assert.Equal(r.T(), "Barings Participation Investors", evtData["item"]) assert.Equal(r.T(), map[string]any{"object": "foo"}, evtData["nested"]) @@ -531,11 +529,8 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() { evtData, err = evt.GetData(kvMap, kafkalib.TopicConfig{IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true}) assert.NoError(r.T(), err) - assert.Equal(r.T(), ext.NewExtendedTime(time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), ext.TimestampTZKindType, ext.ISO8601), evtData[constants.DatabaseUpdatedColumnMarker]) - - updatedAtExtTime, isOk := evtData[constants.UpdateColumnMarker].(*ext.ExtendedTime) - assert.True(r.T(), isOk) - assert.False(r.T(), updatedAtExtTime.GetTime().IsZero()) + assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evtData[constants.DatabaseUpdatedColumnMarker]) + assert.False(r.T(), evtData[constants.UpdateColumnMarker].(time.Time).IsZero()) assert.Equal(r.T(), int64(1001), evtData["id"]) assert.Equal(r.T(), "Sally", evtData["first_name"]) diff --git a/lib/debezium/converters/time_test.go b/lib/debezium/converters/time_test.go index f5878e21b..1690f860d 100644 --- a/lib/debezium/converters/time_test.go +++ b/lib/debezium/converters/time_test.go @@ -55,9 +55,7 @@ func TestZonedTimestamp_Convert(t *testing.T) { // 3 digits val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.123Z") assert.NoError(t, err) - - expectedExtTime := ext.NewExtendedTime(time.Date(2025, time.September, 13, 0, 0, 0, 123000000, time.UTC), ext.TimestampTZKindType, time.RFC3339Nano) - assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime)) + assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123000000, time.UTC), val.(time.Time)) assert.Equal(t, "2025-09-13T00:00:00.123Z", val.(time.Time).Format(time.RFC3339Nano)) } { diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index 0c1efdcd6..f10042218 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -2,7 +2,6 @@ package debezium import ( "testing" - "time" "github.com/stretchr/testify/assert" @@ -233,7 +232,7 @@ func TestField_ToKindDetails(t *testing.T) { // Timestamp with timezone kd, err := Field{DebeziumType: ZonedTimestamp}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, time.RFC3339Nano), kd) + assert.Equal(t, typing.TimestampTZ, kd) } { // Timestamp without timezone diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index 1fb28f581..b00956f1a 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -49,11 +49,10 @@ func TestField_ShouldSetDefaultValue(t *testing.T) { { // *ext.ExtendedTime field := Field{} - assert.True(t, field.ShouldSetDefaultValue(ext.NewExtendedTime(time.Now(), ext.TimestampTZKindType, ext.RFC3339Millisecond))) - + assert.True(t, field.ShouldSetDefaultValue(ext.NewExtendedTime(time.Now(), ext.TimeKindType, ""))) assert.False(t, field.ShouldSetDefaultValue(&ext.ExtendedTime{})) var ts time.Time - assert.False(t, field.ShouldSetDefaultValue(ext.NewExtendedTime(ts, ext.TimestampTZKindType, ext.RFC3339Millisecond))) + assert.False(t, field.ShouldSetDefaultValue(ext.NewExtendedTime(ts, ext.TimeKindType, ""))) } { // time.Time diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 572aedacd..5df12bb2f 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -14,7 +14,6 @@ import ( "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" ) func (d *DDLTestSuite) TestAlterComplexObjects() { @@ -51,7 +50,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { func (d *DDLTestSuite) TestAlterIdempotency() { cols := []columns.Column{ - columns.NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), + columns.NewColumn("created_at", typing.TimestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("order_name", typing.String), columns.NewColumn("start", typing.String), @@ -81,7 +80,7 @@ func (d *DDLTestSuite) TestAlterIdempotency() { func (d *DDLTestSuite) TestAlterTableAdd() { // Test adding a bunch of columns cols := []columns.Column{ - columns.NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), + columns.NewColumn("created_at", typing.TimestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("order_name", typing.String), columns.NewColumn("start", typing.String), @@ -123,7 +122,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { // Test adding a bunch of columns cols := []columns.Column{ - columns.NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), + columns.NewColumn("created_at", typing.TimestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("name", typing.String), columns.NewColumn("start", typing.String), @@ -180,7 +179,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { func (d *DDLTestSuite) TestAlterTableDelete() { // Test adding a bunch of columns cols := []columns.Column{ - columns.NewColumn("created_at", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), + columns.NewColumn("created_at", typing.TimestampTZ), columns.NewColumn("id", typing.Integer), columns.NewColumn("name", typing.String), columns.NewColumn("col_to_delete", typing.String), diff --git a/lib/optimization/table_data_merge_columns_test.go b/lib/optimization/table_data_merge_columns_test.go index 07105ed64..ccd28c4d0 100644 --- a/lib/optimization/table_data_merge_columns_test.go +++ b/lib/optimization/table_data_merge_columns_test.go @@ -14,12 +14,12 @@ func TestTableData_UpdateInMemoryColumnsFromDestination_Tz(t *testing.T) { { // In memory and destination columns are both timestamp_tz tableData := &TableData{inMemoryColumns: &columns.Columns{}} - tableData.AddInMemoryCol(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))) + tableData.AddInMemoryCol(columns.NewColumn("foo", typing.TimestampTZ)) - assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")))) + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("foo", typing.TimestampTZ))) updatedColumn, isOk := tableData.inMemoryColumns.GetColumn("foo") assert.True(t, isOk) - assert.Equal(t, ext.TimestampTZKindType, updatedColumn.KindDetails.ExtendedTimeDetails.Type) + assert.Equal(t, typing.TimestampTZ, updatedColumn.KindDetails) } { // In memory is timestamp_ntz and destination is timestamp_tz @@ -31,13 +31,11 @@ func TestTableData_UpdateInMemoryColumnsFromDestination_Tz(t *testing.T) { ), ) - assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")))) + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("foo", typing.TimestampTZ))) updatedColumn, isOk := tableData.inMemoryColumns.GetColumn("foo") assert.True(t, isOk) - assert.Equal(t, ext.TimestampTZKindType, updatedColumn.KindDetails.ExtendedTimeDetails.Type) - assert.Equal(t, "2006-01-02T15:04:05.999999999Z07:00", updatedColumn.KindDetails.ExtendedTimeDetails.Format) + assert.Equal(t, typing.TimestampTZ, updatedColumn.KindDetails) } - } func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { @@ -113,7 +111,7 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { // Casting these as STRING so tableColumn via this f(x) will set it correctly. tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String)) tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String)) - tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String)) + tableDataCols.AddColumn(columns.NewColumn("string_to_timestamp_tz", typing.String)) tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String)) extDecimalType := typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(22, 2)) @@ -121,7 +119,7 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String)) // Testing extTimeDetails - for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} { + for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "string_to_timestamp_tz"} { col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol) assert.True(t, isOk, extTimeDetailsCol) assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol) @@ -130,7 +128,6 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")))) assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.Date))) - assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")))) dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date") assert.True(t, isOk) @@ -141,10 +138,13 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails) assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type) - dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime") - assert.True(t, isOk) - assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails) - assert.Equal(t, ext.TimestampTZKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type) + { + // Update column from string to TimestampTZ + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("string_to_timestamp_tz", typing.TimestampTZ))) + col, isOk := tableData.inMemoryColumns.GetColumn("string_to_timestamp_tz") + assert.True(t, isOk) + assert.Equal(t, typing.TimestampTZ, col.KindDetails) + } // Testing extDecimalDetails // Confirm that before you update, it's invalid. diff --git a/lib/optimization/table_data_test.go b/lib/optimization/table_data_test.go index d3e3f17fc..78862e0fb 100644 --- a/lib/optimization/table_data_test.go +++ b/lib/optimization/table_data_test.go @@ -131,10 +131,9 @@ func TestTableData_ReadOnlyInMemoryCols(t *testing.T) { func TestTableData_UpdateInMemoryColumns(t *testing.T) { var _cols columns.Columns for colName, colKind := range map[string]typing.KindDetails{ - "FOO": typing.String, - "bar": typing.Invalid, - "CHANGE_me": typing.String, - "do_not_change_format": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "FOO": typing.String, + "bar": typing.Invalid, + "CHANGE_me": typing.String, } { _cols.AddColumn(columns.NewColumn(colName, colKind)) } @@ -143,23 +142,16 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) { inMemoryColumns: &_cols, } - extCol, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("do_not_change_format") - assert.True(t, isOk) - - extCol.KindDetails.ExtendedTimeDetails.Format = time.RFC3339Nano - tableData.inMemoryColumns.UpdateColumn(columns.NewColumn(extCol.Name(), extCol.KindDetails)) - for name, colKindDetails := range map[string]typing.KindDetails{ - "foo": typing.String, - "change_me": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), - "bar": typing.Boolean, - "do_not_change_format": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "foo": typing.String, + "change_me": typing.TimestampTZ, + "bar": typing.Boolean, } { assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn(name, colKindDetails))) } // It's saved back in the original format. - _, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("foo") + _, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("foo") assert.False(t, isOk) _, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("FOO") @@ -167,18 +159,12 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) { col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("CHANGE_me") assert.True(t, isOk) - assert.Equal(t, ext.TimestampTZKindType, col.KindDetails.ExtendedTimeDetails.Type) + assert.Equal(t, typing.TimestampTZ, col.KindDetails) // It went from invalid to boolean. col, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("bar") assert.True(t, isOk) assert.Equal(t, typing.Boolean, col.KindDetails) - - col, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("do_not_change_format") - assert.True(t, isOk) - assert.Equal(t, col.KindDetails.Kind, typing.ETime.Kind) - assert.Equal(t, col.KindDetails.ExtendedTimeDetails.Type, ext.TimestampTZKindType, "correctly mapped type") - assert.Equal(t, col.KindDetails.ExtendedTimeDetails.Format, time.RFC3339Nano, "format has been preserved") } func TestTableData_ShouldFlushRowLength(t *testing.T) { @@ -379,13 +365,5 @@ func TestMergeColumn(t *testing.T) { col := mergeColumn(timestampNTZColumn, timestampTZColumn) assert.Equal(t, typing.TimestampTZ, col.KindDetails) } - { - // Copy the dest column format if in-mem column format is empty. - inMemoryColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")) - // Clearing the format - inMemoryColumn.KindDetails.ExtendedTimeDetails.Format = "" - destinationColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")) - assert.Equal(t, destinationColumn.KindDetails.ExtendedTimeDetails.Format, mergeColumn(inMemoryColumn, destinationColumn).KindDetails.ExtendedTimeDetails.Format) - } } } diff --git a/lib/typing/ext/parse.go b/lib/typing/ext/parse.go index f70233582..0cf3f03cd 100644 --- a/lib/typing/ext/parse.go +++ b/lib/typing/ext/parse.go @@ -48,6 +48,8 @@ func ParseTimestampNTZFromInterface(val any) (time.Time, error) { func ParseTimestampTZFromInterface(val any) (time.Time, error) { switch convertedVal := val.(type) { + case nil: + return time.Time{}, fmt.Errorf("val is nil") case time.Time: return convertedVal, nil case *ExtendedTime: diff --git a/lib/typing/ext/parse_test.go b/lib/typing/ext/parse_test.go index 4bb515c68..18222a5a0 100644 --- a/lib/typing/ext/parse_test.go +++ b/lib/typing/ext/parse_test.go @@ -45,20 +45,29 @@ func TestParseTimestampTZFromInterface(t *testing.T) { assert.ErrorContains(t, err, "val is nil") } { - // True - _, err := ParseTimestampTZFromInterface(true) - assert.ErrorContains(t, err, "failed to parse colVal, expected type string or *ExtendedTime and got: bool") + // Boolean + { + // True + _, err := ParseTimestampTZFromInterface(true) + assert.ErrorContains(t, err, "unsupported type: bool") + } + { + // False + _, err := ParseTimestampTZFromInterface(false) + assert.ErrorContains(t, err, "unsupported type: bool") + } } { - // False - _, err := ParseTimestampTZFromInterface(false) - assert.ErrorContains(t, err, "failed to parse colVal, expected type string or *ExtendedTime and got: bool") + // time.Time + value, err := ParseTimestampTZFromInterface(time.Date(2024, 9, 19, 16, 5, 18, 123_456_789, time.UTC)) + assert.NoError(t, err) + assert.Equal(t, "2024-09-19T16:05:18.123456789Z", value.Format(time.RFC3339Nano)) } { // String - RFC3339MillisecondUTC - value, err := ParseTimestampTZFromInterface("2024-09-19T16:05:18.630Z") + value, err := ParseTimestampTZFromInterface("2024-09-19T16:05:18.631Z") assert.NoError(t, err) - assert.Equal(t, "2024-09-19T16:05:18.630Z", value.Format(time.RFC3339Nano)) + assert.Equal(t, "2024-09-19T16:05:18.631Z", value.Format(time.RFC3339Nano)) } { // String - RFC3339MicrosecondUTC diff --git a/lib/typing/ext/time.go b/lib/typing/ext/time.go index 37f9bab51..3773e3f88 100644 --- a/lib/typing/ext/time.go +++ b/lib/typing/ext/time.go @@ -10,8 +10,7 @@ import ( type ExtendedTimeKindType string const ( - TimestampTZKindType ExtendedTimeKindType = "timestamp_tz" - TimeKindType ExtendedTimeKindType = "time" + TimeKindType ExtendedTimeKindType = "time" ) func (e ExtendedTimeKindType) defaultLayout() (string, error) { From 68390ae377462c002e26f28e4aba36d450571e8b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:33:05 -0700 Subject: [PATCH 07/10] Clean --- .../bigquery/converters/converters_test.go | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/clients/bigquery/converters/converters_test.go b/clients/bigquery/converters/converters_test.go index 7eea88c9e..61d7ba3d6 100644 --- a/clients/bigquery/converters/converters_test.go +++ b/clients/bigquery/converters/converters_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/artie-labs/transfer/lib/typing/ext" + "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/numbers" @@ -55,6 +57,12 @@ func TestStringConverter_Convert(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "2021-01-01T09:10:12.400123991", val) } + { + // Timestamp TZ + val, err := NewStringConverter(typing.TimestampTZ).Convert(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)) + assert.NoError(t, err) + assert.Equal(t, "2021-01-01T00:00:00Z", val) + } { // Invalid _, err := NewStringConverter(typing.String).Convert(time.Date(2021, 1, 1, 9, 10, 12, 400_123_991, time.UTC)) @@ -62,10 +70,16 @@ func TestStringConverter_Convert(t *testing.T) { } } { - // Extended time - val, err := NewStringConverter(typing.TimestampTZ).Convert(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)) + // Extended Time + val, err := NewStringConverter(typing.MustNewExtendedTimeDetails(typing.String, ext.TimeKindType, "")).Convert( + ext.NewExtendedTime( + time.Date(2021, 1, 1, 9, 10, 11, 123_456_789, time.UTC), + ext.TimeKindType, + "", + ), + ) assert.NoError(t, err) - assert.Equal(t, "2021-01-01T00:00:00Z", val) + assert.Equal(t, "09:10:11.123456+00", val) } } From a0362c12eccc70e00cf38991b2152baad66e19c4 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:33:29 -0700 Subject: [PATCH 08/10] Imports. --- clients/bigquery/converters/converters_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/bigquery/converters/converters_test.go b/clients/bigquery/converters/converters_test.go index 61d7ba3d6..a3aeb3f8e 100644 --- a/clients/bigquery/converters/converters_test.go +++ b/clients/bigquery/converters/converters_test.go @@ -4,13 +4,12 @@ import ( "testing" "time" - "github.com/artie-labs/transfer/lib/typing/ext" - "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/numbers" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" ) func TestStringConverter_Convert(t *testing.T) { From acd113fbde7db99ebf18face711946f542eda64c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:35:05 -0700 Subject: [PATCH 09/10] Clean up. --- clients/bigquery/storagewrite.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 09fdc589a..8c303179f 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -8,6 +8,10 @@ import ( "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/bigquery/storage/managedwriter/adapt" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/dynamicpb" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/artie-labs/transfer/clients/bigquery/converters" "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" @@ -15,8 +19,6 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/types/dynamicpb" ) // columnToTableFieldSchema returns a [*storagepb.TableFieldSchema] suitable for transferring data of the type that the column specifies. @@ -220,6 +222,10 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err) } + if err = timestamppb.New(_time).CheckValid(); err != nil { + return nil, err + } + message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro())) case typing.ETime.Kind: if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil { From 2120e87a12340f9ae552abecf2f9b454aa2a612f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 31 Oct 2024 15:39:36 -0700 Subject: [PATCH 10/10] Clean up. --- clients/mssql/values.go | 7 +++++++ clients/mssql/values_test.go | 15 +++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/clients/mssql/values.go b/clients/mssql/values.go index 1e1b767f7..14b6f69a5 100644 --- a/clients/mssql/values.go +++ b/clients/mssql/values.go @@ -40,6 +40,13 @@ func parseValue(colVal any, colKind columns.Column) (any, error) { return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err) } + return _time, nil + case typing.TimestampTZ.Kind: + _time, err := ext.ParseTimestampTZFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err) + } + return _time, nil case typing.ETime.Kind: if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil { diff --git a/clients/mssql/values_test.go b/clients/mssql/values_test.go index 41a01c17f..5584eeb05 100644 --- a/clients/mssql/values_test.go +++ b/clients/mssql/values_test.go @@ -48,6 +48,21 @@ func TestParseValue(t *testing.T) { assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time)) } } + { + // Timestamp TZ + { + // String + val, err := parseValue("2021-01-04T09:32:00Z", columns.NewColumn("timestamp_tz", typing.TimestampTZ)) + assert.NoError(t, err) + assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time)) + } + { + // time.Time + val, err := parseValue(time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), columns.NewColumn("timestamp_tz", typing.TimestampTZ)) + assert.NoError(t, err) + assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time)) + } + } { val, err := parseValue("string value", columns.NewColumn("foo", typing.String)) assert.NoError(t, err)