diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 3692759ce..a8e10c9dd 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -49,14 +49,14 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s return "json" case typing.Date.Kind: return "date" + case typing.TimestampNTZ.Kind: + return "datetime" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { case ext.TimestampTZKindType: // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type // We should be using TIMESTAMP since it's an absolute point in time. return "timestamp" - case ext.TimestampNTZKindType: - return "datetime" case ext.TimeKindType: return "time" } @@ -108,7 +108,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD case "timestamp": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") case "datetime": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "") + return typing.TimestampNTZ, nil case "time": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "") case "date": diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 660d952cb..a13c86fdd 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -92,7 +92,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) { "record": typing.Struct, "json": typing.Struct, // Datetime - "datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "datetime": typing.TimestampNTZ, "timestamp": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), "time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), "date": typing.Date, diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 386755894..715b8782f 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -40,14 +40,14 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem fieldType = storagepb.TableFieldSchema_STRING case typing.Date.Kind: fieldType = storagepb.TableFieldSchema_DATE + case typing.TimestampNTZ.Kind: + fieldType = storagepb.TableFieldSchema_DATETIME case typing.ETime.Kind: switch column.KindDetails.ExtendedTimeDetails.Type { case ext.TimeKindType: fieldType = storagepb.TableFieldSchema_TIME case ext.TimestampTZKindType: fieldType = storagepb.TableFieldSchema_TIMESTAMP - case ext.TimestampNTZKindType: - fieldType = storagepb.TableFieldSchema_DATETIME default: return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type) } @@ -209,6 +209,13 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto daysSinceEpoch := _time.Unix() / (60 * 60 * 24) message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch))) + case typing.TimestampNTZ.Kind: + _time, err := ext.ParseTimestampNTZFromInterface(value) + if err != nil { + return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err) + } + + message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time))) case typing.ETime.Kind: if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil { return nil, err @@ -227,8 +234,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto return nil, err } message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro())) - case ext.TimestampNTZKindType: - message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time))) default: return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type) } diff --git a/clients/bigquery/storagewrite_test.go b/clients/bigquery/storagewrite_test.go index 1ec789b9e..b17e192b8 100644 --- a/clients/bigquery/storagewrite_test.go +++ b/clients/bigquery/storagewrite_test.go @@ -168,8 +168,9 @@ func TestRowToMessage(t *testing.T) { columns.NewColumn("c_string", typing.String), columns.NewColumn("c_string_decimal", typing.String), columns.NewColumn("c_time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")), + columns.NewColumn("c_timestamp", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), columns.NewColumn("c_date", typing.Date), - columns.NewColumn("c_datetime", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")), + columns.NewColumn("c_datetime", typing.TimestampNTZ), columns.NewColumn("c_struct", typing.Struct), columns.NewColumn("c_array", typing.Array), } @@ -188,8 +189,9 @@ func TestRowToMessage(t *testing.T) { "c_string": "foo bar", "c_string_decimal": decimal.NewDecimal(numbers.MustParseDecimal("1.61803")), "c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""), + "c_timestamp": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""), "c_date": time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), - "c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""), + "c_datetime": time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), "c_struct": map[string]any{"baz": []string{"foo", "bar"}}, "c_array": []string{"foo", "bar"}, } @@ -220,8 +222,9 @@ func TestRowToMessage(t *testing.T) { "cString": "foo bar", "cStringDecimal": "1.61803", "cTime": "17521704960", + "cTimestamp": "981173106000000", "cDate": float64(11356), - "cDatetime": "981173106000000", + "cDatetime": "140817083031093248", "cStruct": `{"baz":["foo","bar"]}`, "cArray": []any{"foo", "bar"}, }, result) diff --git a/clients/databricks/dialect/typing.go b/clients/databricks/dialect/typing.go index 65f0c989d..cf451f86a 100644 --- a/clients/databricks/dialect/typing.go +++ b/clients/databricks/dialect/typing.go @@ -26,14 +26,14 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) return "BOOLEAN" case typing.Date.Kind: return "DATE" + case typing.TimestampNTZ.Kind: + // This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables. + // Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html + return "TIMESTAMP_NTZ" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { case ext.TimestampTZKindType: return "TIMESTAMP" - case ext.TimestampNTZKindType: - // This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables. - // Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html - return "TIMESTAMP_NTZ" case ext.TimeKindType: return "STRING" } @@ -76,7 +76,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD case "timestamp": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") case "timestamp_ntz": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "") + return typing.TimestampNTZ, nil } return typing.Invalid, fmt.Errorf("unsupported data type: %q", rawType) diff --git a/clients/databricks/dialect/typing_test.go b/clients/databricks/dialect/typing_test.go index 60f55181d..8d43c20ac 100644 --- a/clients/databricks/dialect/typing_test.go +++ b/clients/databricks/dialect/typing_test.go @@ -151,7 +151,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) { // Timestamp NTZ kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP_NTZ", "") assert.NoError(t, err) - assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""), kd) + assert.Equal(t, typing.TimestampNTZ, kd) } { // Variant diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 8a3cafe26..22f163152 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -55,13 +55,13 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s return "BIT" case typing.Date.Kind: return "DATE" + case typing.TimestampNTZ.Kind: + // Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088 + return "datetime2" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { case ext.TimestampTZKindType: return "datetimeoffset" - case ext.TimestampNTZKindType: - // Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088 - return "datetime2" case ext.TimeKindType: return "time" } @@ -116,7 +116,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ case "datetime", "datetime2": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "") + return typing.TimestampNTZ, nil case "datetimeoffset": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") case "time": diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index bcb0dd1d8..d9ffd0c8e 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -50,22 +50,23 @@ func TestMSSQLDialect_KindForDataType(t *testing.T) { dialect := MSSQLDialect{} colToExpectedKind := map[string]typing.KindDetails{ - "char": typing.String, - "varchar": typing.String, - "nchar": typing.String, - "nvarchar": typing.String, - "ntext": typing.String, - "text": typing.String, - "smallint": typing.Integer, - "tinyint": typing.Integer, - "int": typing.Integer, - "float": typing.Float, - "real": typing.Float, - "bit": typing.Boolean, - "date": typing.Date, - "time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), - "datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), - "datetime2": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), + "char": typing.String, + "varchar": typing.String, + "nchar": typing.String, + "nvarchar": typing.String, + "ntext": typing.String, + "text": typing.String, + "smallint": typing.Integer, + "tinyint": typing.Integer, + "int": typing.Integer, + "float": typing.Float, + "real": typing.Float, + "bit": typing.Boolean, + "date": typing.Date, + "time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""), + "datetime": typing.TimestampNTZ, + "datetime2": typing.TimestampNTZ, + "datetimeoffset": typing.ETime, } for col, expectedKind := range colToExpectedKind { diff --git a/clients/mssql/values.go b/clients/mssql/values.go index 161aca329..1e1b767f7 100644 --- a/clients/mssql/values.go +++ b/clients/mssql/values.go @@ -33,6 +33,13 @@ func parseValue(colVal any, colKind columns.Column) (any, error) { return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err) } + return _time, nil + case typing.TimestampNTZ.Kind: + _time, err := ext.ParseTimestampNTZFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err) + } + return _time, nil case typing.ETime.Kind: if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil { diff --git a/clients/mssql/values_test.go b/clients/mssql/values_test.go index f9818534a..41a01c17f 100644 --- a/clients/mssql/values_test.go +++ b/clients/mssql/values_test.go @@ -33,6 +33,21 @@ func TestParseValue(t *testing.T) { assert.Equal(t, "2021-01-01", val.(time.Time).Format(ext.PostgresDateFormat)) } } + { + // Timestamp NTZ + { + // String + val, err := parseValue("2021-01-04T09:32:00", columns.NewColumn("timestamp_ntz", typing.TimestampNTZ)) + assert.NoError(t, err) + assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time)) + } + { + // time.Time + val, err := parseValue(time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), columns.NewColumn("timestamp_ntz", typing.TimestampNTZ)) + assert.NoError(t, err) + assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time)) + } + } { val, err := parseValue("string value", columns.NewColumn("foo", typing.String)) assert.NoError(t, err) diff --git a/clients/redshift/dialect/typing.go b/clients/redshift/dialect/typing.go index be7a7dbba..b4d3de6be 100644 --- a/clients/redshift/dialect/typing.go +++ b/clients/redshift/dialect/typing.go @@ -47,12 +47,12 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string { return "BOOLEAN NULL" case typing.Date.Kind: return "DATE" + case typing.TimestampNTZ.Kind: + return "TIMESTAMP WITHOUT TIME ZONE" case typing.ETime.Kind: switch kd.ExtendedTimeDetails.Type { case ext.TimestampTZKindType: return "timestamp with time zone" - case ext.TimestampNTZKindType: - return "timestamp without time zone" case ext.TimeKindType: return "time" } @@ -106,7 +106,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) ( case "double precision": return typing.Float, nil case "timestamp", "timestamp without time zone": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "") + return typing.TimestampNTZ, nil case "timestamp with time zone": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") case "time without time zone": diff --git a/clients/redshift/dialect/typing_test.go b/clients/redshift/dialect/typing_test.go index 9a883aaac..e1f78b623 100644 --- a/clients/redshift/dialect/typing_test.go +++ b/clients/redshift/dialect/typing_test.go @@ -51,7 +51,7 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) { } { // Without timezone - assert.Equal(t, "timestamp without time zone", RedshiftDialect{}.DataTypeForKind(typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""), false)) + assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false)) } } } @@ -140,8 +140,7 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) { { kd, err := dialect.KindForDataType("timestamp without time zone", "") assert.NoError(t, err) - assert.Equal(t, typing.ETime.Kind, kd.Kind) - assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type) + assert.Equal(t, typing.TimestampNTZ, kd) } { kd, err := dialect.KindForDataType("time without time zone", "") diff --git a/clients/shared/default_value.go b/clients/shared/default_value.go index 4e84a4b13..4036f95f8 100644 --- a/clients/shared/default_value.go +++ b/clients/shared/default_value.go @@ -28,6 +28,13 @@ func DefaultValue(column columns.Column, dialect sql.Dialect) (any, error) { } return sql.QuoteLiteral(_time.Format(ext.PostgresDateFormat)), nil + case typing.TimestampNTZ.Kind: + _time, err := ext.ParseTimestampNTZFromInterface(column.DefaultValue()) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", column.DefaultValue(), err) + } + + return sql.QuoteLiteral(_time.Format(ext.RFC3339NoTZ)), nil case typing.ETime.Kind: if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil { return nil, err diff --git a/clients/shared/default_value_test.go b/clients/shared/default_value_test.go index 824cf20d3..35fd91977 100644 --- a/clients/shared/default_value_test.go +++ b/clients/shared/default_value_test.go @@ -82,6 +82,11 @@ func TestColumn_DefaultValue(t *testing.T) { col: columns.NewColumnWithDefaultValue("", typing.Date, birthdayDateTime), expectedValue: "'2022-09-06'", }, + { + name: "timestamp_ntz", + col: columns.NewColumnWithDefaultValue("", typing.TimestampNTZ, birthdayDateTime), + expectedValue: "'2022-09-06T03:19:24.942'", + }, { name: "time", col: columns.NewColumnWithDefaultValue("", timeKind, birthdayDateTime), diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index d76b530c0..f9985ee21 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -32,12 +32,12 @@ func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) return "boolean" case typing.Date.Kind: return "date" + case typing.TimestampNTZ.Kind: + return "timestamp_ntz" case typing.ETime.Kind: switch kindDetails.ExtendedTimeDetails.Type { case ext.TimestampTZKindType: return "timestamp_tz" - case ext.TimestampNTZKindType: - return "timestamp_ntz" case ext.TimeKindType: return "time" } @@ -100,7 +100,7 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing. case "timestamp_ltz", "timestamp_tz": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "") case "timestamp", "datetime", "timestamp_ntz": - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "") + return typing.TimestampNTZ, nil case "time": return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "") case "date": diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 7fe2e77b4..93d7491a9 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -189,7 +189,7 @@ func TestSnowflakeDialect_KindForDataType_DateTime(t *testing.T) { for _, expectedDateTime := range expectedDateTimes { kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "") assert.NoError(t, err) - assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type, expectedDateTime) + assert.Equal(t, typing.TimestampNTZ, kd, expectedDateTime) } } } diff --git a/lib/cdc/relational/debezium_test.go b/lib/cdc/relational/debezium_test.go index b34fc9671..34092f740 100644 --- a/lib/cdc/relational/debezium_test.go +++ b/lib/cdc/relational/debezium_test.go @@ -206,10 +206,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() { // Datetime without TZ is emitted in microseconds which is 1000x larger than nanoseconds. assert.Equal( r.T(), - ext.NewExtendedTime( - time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC), - ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ, - ), + time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC), evtData["ts_no_tz1"], ) assert.Equal(r.T(), time.Date(2023, time.February, 2, 17, 54, 11, 451000000, time.UTC), evt.GetExecutionTime()) diff --git a/lib/debezium/converters/timestamp.go b/lib/debezium/converters/timestamp.go index d439b2a34..3a4d5114a 100644 --- a/lib/debezium/converters/timestamp.go +++ b/lib/debezium/converters/timestamp.go @@ -4,17 +4,12 @@ import ( "time" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/ext" ) type Timestamp struct{} -func (Timestamp) layout() string { - return ext.RFC3339MillisecondNoTZ -} - func (t Timestamp) ToKindDetails() (typing.KindDetails, error) { - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, t.layout()) + return typing.TimestampNTZ, nil } func (t Timestamp) Convert(value any) (any, error) { @@ -24,17 +19,13 @@ func (t Timestamp) Convert(value any) (any, error) { } // Represents the number of milliseconds since the epoch, and does not include timezone information. - return ext.NewExtendedTime(time.UnixMilli(castedValue).In(time.UTC), ext.TimestampNTZKindType, t.layout()), nil + return time.UnixMilli(castedValue).In(time.UTC), nil } type MicroTimestamp struct{} -func (MicroTimestamp) layout() string { - return ext.RFC3339MicrosecondNoTZ -} - func (mt MicroTimestamp) ToKindDetails() (typing.KindDetails, error) { - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, mt.layout()) + return typing.TimestampNTZ, nil } func (mt MicroTimestamp) Convert(value any) (any, error) { @@ -44,17 +35,13 @@ func (mt MicroTimestamp) Convert(value any) (any, error) { } // Represents the number of microseconds since the epoch, and does not include timezone information. - return ext.NewExtendedTime(time.UnixMicro(castedValue).In(time.UTC), ext.TimestampNTZKindType, mt.layout()), nil + return time.UnixMicro(castedValue).In(time.UTC), nil } type NanoTimestamp struct{} func (nt NanoTimestamp) ToKindDetails() (typing.KindDetails, error) { - return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, nt.layout()) -} - -func (NanoTimestamp) layout() string { - return ext.RFC3339NanosecondNoTZ + return typing.TimestampNTZ, nil } func (nt NanoTimestamp) Convert(value any) (any, error) { @@ -64,5 +51,5 @@ func (nt NanoTimestamp) Convert(value any) (any, error) { } // Represents the number of nanoseconds since the epoch, and does not include timezone information. - return ext.NewExtendedTime(time.UnixMicro(castedValue/1_000).In(time.UTC), ext.TimestampNTZKindType, nt.layout()), nil + return time.UnixMicro(castedValue / 1_000).In(time.UTC), nil } diff --git a/lib/debezium/converters/timestamp_test.go b/lib/debezium/converters/timestamp_test.go index 759ed7662..1f052abc5 100644 --- a/lib/debezium/converters/timestamp_test.go +++ b/lib/debezium/converters/timestamp_test.go @@ -2,6 +2,7 @@ package converters import ( "testing" + "time" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -11,68 +12,50 @@ import ( func TestTimestamp_Converter(t *testing.T) { kd, err := Timestamp{}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MillisecondNoTZ), kd) + assert.Equal(t, typing.TimestampNTZ, kd) { // Invalid conversion - _, err := Timestamp{}.Convert("invalid") + _, err = Timestamp{}.Convert("invalid") assert.ErrorContains(t, err, "expected type int64, got string") } { // Valid conversion converted, err := Timestamp{}.Convert(int64(1_725_058_799_089)) assert.NoError(t, err) - assert.Equal(t, "2024-08-30T22:59:59.089", converted.(*ext.ExtendedTime).GetTime().Format(Timestamp{}.layout())) - } - { - // ms is preserved despite it being all zeroes. - converted, err := Timestamp{}.Convert(int64(1_725_058_799_000)) - assert.NoError(t, err) - assert.Equal(t, "2024-08-30T22:59:59.000", converted.(*ext.ExtendedTime).GetTime().Format(Timestamp{}.layout())) + assert.Equal(t, "2024-08-30T22:59:59.089", converted.(time.Time).Format(ext.RFC3339NoTZ)) } } func TestMicroTimestamp_Converter(t *testing.T) { kd, err := MicroTimestamp{}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), kd) + assert.Equal(t, typing.TimestampNTZ, kd) { // Invalid conversion - _, err := MicroTimestamp{}.Convert("invalid") + _, err = MicroTimestamp{}.Convert("invalid") assert.ErrorContains(t, err, "expected type int64, got string") } { // Valid conversion converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_827_923)) assert.NoError(t, err) - assert.Equal(t, "2024-04-08T20:56:35.827923", converted.(*ext.ExtendedTime).GetTime().Format(MicroTimestamp{}.layout())) - } - { - // micros is preserved despite it being all zeroes. - converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_820_000)) - assert.NoError(t, err) - assert.Equal(t, "2024-04-08T20:56:35.820000", converted.(*ext.ExtendedTime).GetTime().Format(MicroTimestamp{}.layout())) + assert.Equal(t, "2024-04-08T20:56:35.827923", converted.(time.Time).Format(ext.RFC3339NoTZ)) } } func TestNanoTimestamp_Converter(t *testing.T) { kd, err := NanoTimestamp{}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339NanosecondNoTZ), kd) + assert.Equal(t, typing.TimestampNTZ, kd) { // Invalid conversion - _, err := NanoTimestamp{}.Convert("invalid") + _, err = NanoTimestamp{}.Convert("invalid") assert.ErrorContains(t, err, "expected type int64, got string") } { // Valid conversion converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_001_000)) assert.NoError(t, err) - assert.Equal(t, "2024-04-08T20:56:35.827001000", converted.(*ext.ExtendedTime).GetTime().Format(NanoTimestamp{}.layout())) - } - { - // nanos is preserved despite it being all zeroes. - converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_000_000)) - assert.NoError(t, err) - assert.Equal(t, "2024-04-08T20:56:35.827000000", converted.(*ext.ExtendedTime).GetTime().Format(NanoTimestamp{}.layout())) + assert.Equal(t, "2024-04-08T20:56:35.827001", converted.(time.Time).Format(ext.RFC3339NoTZ)) } } diff --git a/lib/debezium/keys_test.go b/lib/debezium/keys_test.go index 8543fa758..e42b6d797 100644 --- a/lib/debezium/keys_test.go +++ b/lib/debezium/keys_test.go @@ -4,8 +4,6 @@ import ( "testing" "time" - "github.com/artie-labs/transfer/lib/typing/ext" - "github.com/stretchr/testify/assert" ) @@ -119,7 +117,7 @@ func TestParsePartitionKeyStruct(t *testing.T) { `)) assert.NoError(t, err) assert.Equal(t, "339f3f2f-f29f-4f00-869e-476122310eff", keys["id"]) - assert.Equal(t, time.Date(2024, 4, 16, 1, 8, 19, 440000000, time.UTC), keys["created_at"].(*ext.ExtendedTime).GetTime()) + assert.Equal(t, time.Date(2024, 4, 16, 1, 8, 19, 440000000, time.UTC), keys["created_at"].(time.Time)) keys, err = parsePartitionKeyStruct([]byte(`{ "schema": { diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index a2b2fcab0..0c1efdcd6 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -240,8 +240,7 @@ func TestField_ToKindDetails(t *testing.T) { for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp} { kd, err := Field{DebeziumType: dbzType}.ToKindDetails() assert.NoError(t, err) - assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type) - assert.Equal(t, typing.ETime.Kind, kd.Kind) + assert.Equal(t, typing.TimestampNTZ, kd) } } { diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index f6ed3281a..1f8d25331 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -302,30 +302,30 @@ func TestField_ParseValue(t *testing.T) { field := Field{Type: Int64, DebeziumType: dbzType} value, err := field.ParseValue(int64(1_725_058_799_000)) assert.NoError(t, err) - assert.Equal(t, "2024-08-30T22:59:59.000", value.(*ext.ExtendedTime).GetTime().Format(ext.RFC3339MillisecondNoTZ)) + assert.Equal(t, "2024-08-30T22:59:59.000", value.(time.Time).Format(ext.RFC3339MillisecondNoTZ)) } } { // Nano timestamp field := Field{Type: Int64, DebeziumType: NanoTimestamp} - val, err := field.ParseValue(int64(1_712_609_795_827_000_000)) + val, err := field.ParseValue(int64(1_712_609_795_827_001_000)) assert.NoError(t, err) - assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), ext.TimestampNTZKindType, "2006-01-02T15:04:05.000000000"), val.(*ext.ExtendedTime)) + assert.Equal(t, time.Date(2024, time.April, 8, 20, 56, 35, 827001000, time.UTC), val.(time.Time)) } { // Micro timestamp field := Field{Type: Int64, DebeziumType: MicroTimestamp} { // Int64 - val, err := field.ParseValue(int64(1_712_609_795_827_009)) + val, err := field.ParseValue(int64(1_712_609_795_827_000)) assert.NoError(t, err) - assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827009000, time.UTC), ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), val.(*ext.ExtendedTime)) + assert.Equal(t, time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), val.(time.Time)) } { // Float64 - val, err := field.ParseValue(float64(1_712_609_795_827_001)) + val, err := field.ParseValue(float64(1_712_609_795_827_000)) assert.NoError(t, err) - assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827001000, time.UTC), ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), val.(*ext.ExtendedTime)) + assert.Equal(t, time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), val.(time.Time)) } { // Invalid (string) diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index fb36a75ef..f0a8dd944 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -302,13 +302,6 @@ func mergeColumn(inMemoryCol columns.Column, destCol columns.Column) columns.Col inMemoryCol.KindDetails.ExtendedTimeDetails = &ext.NestedKind{} } - // If the column in the destination is a timestamp_tz and the in-memory column is a timestamp_ntz, we should update the layout to contain timezone locale. - if destCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampTZKindType && inMemoryCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampNTZKindType { - if inMemoryCol.KindDetails.ExtendedTimeDetails.Format != "" { - inMemoryCol.KindDetails.ExtendedTimeDetails.Format += ext.TimezoneOffsetFormat - } - } - // Copy over the type inMemoryCol.KindDetails.ExtendedTimeDetails.Type = destCol.KindDetails.ExtendedTimeDetails.Type // If the in-memory column has no format, we should use the format from the destination. diff --git a/lib/optimization/table_data_merge_columns_test.go b/lib/optimization/table_data_merge_columns_test.go index 7c73b989f..07105ed64 100644 --- a/lib/optimization/table_data_merge_columns_test.go +++ b/lib/optimization/table_data_merge_columns_test.go @@ -27,7 +27,7 @@ func TestTableData_UpdateInMemoryColumnsFromDestination_Tz(t *testing.T) { tableData.AddInMemoryCol( columns.NewColumn( "foo", - typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MillisecondNoTZ), + typing.TimestampNTZ, ), ) @@ -35,7 +35,7 @@ func TestTableData_UpdateInMemoryColumnsFromDestination_Tz(t *testing.T) { updatedColumn, isOk := tableData.inMemoryColumns.GetColumn("foo") assert.True(t, isOk) assert.Equal(t, ext.TimestampTZKindType, updatedColumn.KindDetails.ExtendedTimeDetails.Type) - assert.Equal(t, ext.RFC3339Millisecond, updatedColumn.KindDetails.ExtendedTimeDetails.Format) + assert.Equal(t, "2006-01-02T15:04:05.999999999Z07:00", updatedColumn.KindDetails.ExtendedTimeDetails.Format) } } diff --git a/lib/optimization/table_data_test.go b/lib/optimization/table_data_test.go index ff0800849..278c4a1b3 100644 --- a/lib/optimization/table_data_test.go +++ b/lib/optimization/table_data_test.go @@ -374,7 +374,7 @@ func TestMergeColumn(t *testing.T) { { // Testing for backwards compatibility // in-memory column is TimestampNTZ, destination column is TimestampTZ - timestampNTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")) + timestampNTZColumn := columns.NewColumn("foo", typing.TimestampNTZ) timestampTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")) col := mergeColumn(timestampNTZColumn, timestampTZColumn) assert.Equal(t, ext.TimestampTZKindType, col.KindDetails.ExtendedTimeDetails.Type) diff --git a/lib/parquetutil/parse_values.go b/lib/parquetutil/parse_values.go index b88e44f60..53f041dc5 100644 --- a/lib/parquetutil/parse_values.go +++ b/lib/parquetutil/parse_values.go @@ -27,6 +27,13 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) { } return _time.Format(ext.PostgresDateFormat), nil + case typing.TimestampNTZ.Kind: + _time, err := ext.ParseTimestampNTZFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err) + } + + return _time.UnixMilli(), nil case typing.ETime.Kind: if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil { return "", err diff --git a/lib/typing/ext/parse.go b/lib/typing/ext/parse.go index 794cb630d..600de6505 100644 --- a/lib/typing/ext/parse.go +++ b/lib/typing/ext/parse.go @@ -33,6 +33,19 @@ func ParseDateFromInterface(val any) (time.Time, error) { } } +func ParseTimestampNTZFromInterface(val any) (time.Time, error) { + switch convertedVal := val.(type) { + case time.Time: + return convertedVal, nil + case *ExtendedTime: + return convertedVal.GetTime(), nil + case string: + return parseTimestampNTZ(convertedVal) + default: + return time.Time{}, fmt.Errorf("unsupported type: %T", convertedVal) + } +} + func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, error) { switch convertedVal := val.(type) { case nil: @@ -55,8 +68,6 @@ func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, erro func ParseDateTime(value string, kindType ExtendedTimeKindType) (time.Time, error) { switch kindType { - case TimestampNTZKindType: - return parseTimestampNTZ(value) case TimestampTZKindType: return parseTimestampTZ(value) case TimeKindType: diff --git a/lib/typing/ext/parse_test.go b/lib/typing/ext/parse_test.go index ef03590c4..54f86ddc0 100644 --- a/lib/typing/ext/parse_test.go +++ b/lib/typing/ext/parse_test.go @@ -11,10 +11,8 @@ func TestParseFromInterface(t *testing.T) { { // Extended time var vals []*ExtendedTime - vals = append(vals, NewExtendedTime(time.Now().UTC(), TimestampNTZKindType, RFC3339NoTZ)) vals = append(vals, NewExtendedTime(time.Now().UTC(), TimestampTZKindType, ISO8601)) vals = append(vals, NewExtendedTime(time.Now().UTC(), TimeKindType, PostgresTimeFormat)) - for _, val := range vals { _time, err := ParseFromInterface(val, TimestampTZKindType) assert.NoError(t, err) @@ -90,34 +88,34 @@ func TestParseExtendedDateTime_TimestampTZ(t *testing.T) { assert.Equal(t, tsString, extTime.Format(time.RFC3339Nano)) } -func TestParseExtendedDateTime_TimestampNTZ(t *testing.T) { +func TestParseTimestampNTZFromInterface(t *testing.T) { { // No fractional seconds tsString := "2023-04-24T17:29:05" - extTime, err := ParseDateTime(tsString, TimestampNTZKindType) + ts, err := ParseTimestampNTZFromInterface(tsString) assert.NoError(t, err) - assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ)) + assert.Equal(t, tsString, ts.Format(RFC3339NoTZ)) } { // ms tsString := "2023-04-24T17:29:05.123" - extTime, err := ParseDateTime(tsString, TimestampNTZKindType) + ts, err := ParseTimestampNTZFromInterface(tsString) assert.NoError(t, err) - assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ)) + assert.Equal(t, tsString, ts.Format(RFC3339NoTZ)) } { // microseconds tsString := "2023-04-24T17:29:05.123456" - extTime, err := ParseDateTime(tsString, TimestampNTZKindType) + ts, err := ParseTimestampNTZFromInterface(tsString) assert.NoError(t, err) - assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ)) + assert.Equal(t, tsString, ts.Format(RFC3339NoTZ)) } { // ns tsString := "2023-04-24T17:29:05.123456789" - extTime, err := ParseDateTime(tsString, TimestampNTZKindType) + ts, err := ParseTimestampNTZFromInterface(tsString) assert.NoError(t, err) - assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ)) + assert.Equal(t, tsString, ts.Format(RFC3339NoTZ)) } } diff --git a/lib/typing/ext/time.go b/lib/typing/ext/time.go index 931b9be4a..7c0b0d99e 100644 --- a/lib/typing/ext/time.go +++ b/lib/typing/ext/time.go @@ -10,17 +10,14 @@ import ( type ExtendedTimeKindType string const ( - TimestampTZKindType ExtendedTimeKindType = "timestamp_tz" - TimestampNTZKindType ExtendedTimeKindType = "timestamp_ntz" - TimeKindType ExtendedTimeKindType = "time" + TimestampTZKindType ExtendedTimeKindType = "timestamp_tz" + TimeKindType ExtendedTimeKindType = "time" ) func (e ExtendedTimeKindType) defaultLayout() (string, error) { switch e { case TimestampTZKindType: return time.RFC3339Nano, nil - case TimestampNTZKindType: - return RFC3339NoTZ, nil case TimeKindType: return PostgresTimeFormat, nil default: diff --git a/lib/typing/parquet.go b/lib/typing/parquet.go index c753a3ae9..5d4f2799a 100644 --- a/lib/typing/parquet.go +++ b/lib/typing/parquet.go @@ -104,7 +104,6 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { }.String(), }, nil case Integer.Kind, ETime.Kind: - // Parquet doesn't have native time types, so we are using int64 and casting the value as UNIX ts. return &Field{ Tag: FieldTag{ diff --git a/lib/typing/typing.go b/lib/typing/typing.go index 12c929ee0..34034d9e3 100644 --- a/lib/typing/typing.go +++ b/lib/typing/typing.go @@ -79,6 +79,10 @@ var ( Kind: "date", } + TimestampNTZ = KindDetails{ + Kind: "timestamp_ntz", + } + ETime = KindDetails{ Kind: "extended_time", } diff --git a/lib/typing/values/string.go b/lib/typing/values/string.go index 311d461e5..0bb200f87 100644 --- a/lib/typing/values/string.go +++ b/lib/typing/values/string.go @@ -34,6 +34,13 @@ func ToString(colVal any, colKind typing.KindDetails) (string, error) { } return _time.Format(ext.PostgresDateFormat), nil + case typing.TimestampNTZ.Kind: + _time, err := ext.ParseTimestampNTZFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err) + } + + return _time.Format(ext.RFC3339NoTZ), nil case typing.ETime.Kind: if err := colKind.EnsureExtendedTimeDetails(); err != nil { return "", err diff --git a/lib/typing/values/string_test.go b/lib/typing/values/string_test.go index 7bfee5205..2b5b9f142 100644 --- a/lib/typing/values/string_test.go +++ b/lib/typing/values/string_test.go @@ -40,6 +40,21 @@ func TestToString(t *testing.T) { assert.Equal(t, "2021-01-01", value) } } + { + // Timestamp NTZ + { + // time.Time + value, err := ToString(time.Date(2021, time.January, 1, 17, 33, 4, 150_001_123, time.UTC), typing.TimestampNTZ) + assert.NoError(t, err) + assert.Equal(t, "2021-01-01T17:33:04.150001123", value) + } + { + // String + value, err := ToString("2021-01-01T17:33:04.150001123", typing.TimestampNTZ) + assert.NoError(t, err) + assert.Equal(t, time.Date(2021, time.January, 1, 17, 33, 4, 150_001_123, time.UTC).Format(ext.RFC3339NoTZ), value) + } + } { // ETime {