Skip to content

Commit

Permalink
[Extended Time] Pulling TIMESTAMP_TZ into a separate data type (#1000)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 1, 2024
1 parent e2d8601 commit 7c345bb
Show file tree
Hide file tree
Showing 40 changed files with 270 additions and 296 deletions.
2 changes: 2 additions & 0 deletions clients/bigquery/converters/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func (s StringConverter) Convert(value any) (any, error) {
return castedValue.Format(ext.PostgresDateFormat), nil
case typing.TimestampNTZ:
return castedValue.Format(ext.RFC3339NoTZ), nil
case typing.TimestampTZ:
return castedValue.Format(time.RFC3339Nano), nil
default:
return nil, fmt.Errorf("unexpected kind details: %q", s.kd.Kind)
}
Expand Down
16 changes: 11 additions & 5 deletions clients/bigquery/converters/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,29 @@ func TestStringConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "2021-01-01T09:10:12.400123991", val)
}
{
// Timestamp TZ
val, err := NewStringConverter(typing.TimestampTZ).Convert(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC))
assert.NoError(t, err)
assert.Equal(t, "2021-01-01T00:00:00Z", val)
}
{
// Invalid
_, err := NewStringConverter(typing.String).Convert(time.Date(2021, 1, 1, 9, 10, 12, 400_123_991, time.UTC))
assert.ErrorContains(t, err, `unexpected kind details: "string"`)
}
}
{
// Extended time
val, err := NewStringConverter(typing.MustNewExtendedTimeDetails(typing.String, ext.TimestampTZKindType, "")).Convert(
// Extended Time
val, err := NewStringConverter(typing.MustNewExtendedTimeDetails(typing.String, ext.TimeKindType, "")).Convert(
ext.NewExtendedTime(
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
ext.TimestampTZKindType,
time.Date(2021, 1, 1, 9, 10, 11, 123_456_789, time.UTC),
ext.TimeKindType,
"",
),
)
assert.NoError(t, err)
assert.Equal(t, "2021-01-01T00:00:00Z", val)
assert.Equal(t, "09:10:11.123456+00", val)
}
}

Expand Down
10 changes: 5 additions & 5 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
return "date"
case typing.TimestampNTZ.Kind:
return "datetime"
case typing.TimestampTZ.Kind:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD
case "array":
return typing.Array, nil
case "timestamp":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
return typing.TimestampTZ, nil
case "datetime":
return typing.TimestampNTZ, nil
case "time":
Expand Down
4 changes: 2 additions & 2 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {
"json": typing.Struct,
// Datetime
"datetime": typing.TimestampNTZ,
"timestamp": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"timestamp": typing.TimestampTZ,
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"date": typing.Date,
//Invalid
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {

func TestBigQueryDialect_KindForDataType_NoDataLoss(t *testing.T) {
kindDetails := []typing.KindDetails{
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
typing.TimestampTZ,
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
typing.Date,
typing.String,
Expand Down
20 changes: 13 additions & 7 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem
fieldType = storagepb.TableFieldSchema_DATE
case typing.TimestampNTZ.Kind:
fieldType = storagepb.TableFieldSchema_DATETIME
case typing.TimestampTZ.Kind:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
case typing.ETime.Kind:
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.TimeKindType:
fieldType = storagepb.TableFieldSchema_TIME
case ext.TimestampTZKindType:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
default:
return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down Expand Up @@ -216,6 +216,17 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
}

message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
case typing.TimestampTZ.Kind:
_time, err := ext.ParseTimestampTZFromInterface(value)
if err != nil {
return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err)
}

if err = timestamppb.New(_time).CheckValid(); err != nil {
return nil, err
}

message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro()))
case typing.ETime.Kind:
if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand All @@ -229,11 +240,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.TimeKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64TimeMicros(_time)))
case ext.TimestampTZKindType:
if err = timestamppb.New(_time).CheckValid(); err != nil {
return nil, err
}
message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro()))
default:
return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down
14 changes: 10 additions & 4 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ func TestColumnToTableFieldSchema(t *testing.T) {
assert.Equal(t, storagepb.TableFieldSchema_DATE, fieldSchema.Type)
}
{
// ETime - TimestampTZ:
fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")))
// Datetime (TimestampNTZ)
fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.TimestampNTZ))
assert.NoError(t, err)
assert.Equal(t, storagepb.TableFieldSchema_DATETIME, fieldSchema.Type)
}
{
// Timestamp (TimestampTZ)
fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.TimestampTZ))
assert.NoError(t, err)
assert.Equal(t, storagepb.TableFieldSchema_TIMESTAMP, fieldSchema.Type)
}
Expand Down Expand Up @@ -168,7 +174,7 @@ func TestRowToMessage(t *testing.T) {
columns.NewColumn("c_string", typing.String),
columns.NewColumn("c_string_decimal", typing.String),
columns.NewColumn("c_time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")),
columns.NewColumn("c_timestamp", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")),
columns.NewColumn("c_timestamp", typing.TimestampTZ),
columns.NewColumn("c_date", typing.Date),
columns.NewColumn("c_datetime", typing.TimestampNTZ),
columns.NewColumn("c_struct", typing.Struct),
Expand All @@ -189,7 +195,7 @@ func TestRowToMessage(t *testing.T) {
"c_string": "foo bar",
"c_string_decimal": decimal.NewDecimal(numbers.MustParseDecimal("1.61803")),
"c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""),
"c_timestamp": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""),
"c_timestamp": time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC),
"c_date": time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC),
"c_datetime": time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC),
"c_struct": map[string]any{"baz": []string{"foo", "bar"}},
Expand Down
6 changes: 3 additions & 3 deletions clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case typing.TimestampTZ.Kind:
return "TIMESTAMP"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "TIMESTAMP"
case ext.TimeKindType:
return "STRING"
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD
case "smallint", "tinyint":
return typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.SmallIntegerKind)}, nil
case "timestamp":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
return typing.TimestampTZ, nil
case "timestamp_ntz":
return typing.TimestampNTZ, nil
}
Expand Down
6 changes: 3 additions & 3 deletions clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func TestDatabricksDialect_DataTypeForKind(t *testing.T) {
}
{
// Timestamp
assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.ETime.Kind, ExtendedTimeDetails: &ext.NestedKind{Type: ext.TimestampTZKindType}}, false))
assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.TimestampTZ, false))
}
{
// Timestamp (w/o timezone)
assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.ETime.Kind, ExtendedTimeDetails: &ext.NestedKind{Type: ext.TimestampTZKindType}}, false))
assert.Equal(t, "TIMESTAMP_NTZ", DatabricksDialect{}.DataTypeForKind(typing.TimestampNTZ, false))
}
{
// Time
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) {
// Timestamp
kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP", "")
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), kd)
assert.Equal(t, typing.TimestampTZ, kd)
}
{
// Timestamp NTZ
Expand Down
6 changes: 3 additions & 3 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s
case typing.TimestampNTZ.Kind:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case typing.TimestampTZ.Kind:
return "datetimeoffset"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "datetimeoffset"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ
"datetime2":
return typing.TimestampNTZ, nil
case "datetimeoffset":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
return typing.TimestampTZ, nil
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestMSSQLDialect_KindForDataType(t *testing.T) {
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"datetime": typing.TimestampNTZ,
"datetime2": typing.TimestampNTZ,
"datetimeoffset": typing.ETime,
"datetimeoffset": typing.TimestampTZ,
}

for col, expectedKind := range colToExpectedKind {
Expand Down
7 changes: 7 additions & 0 deletions clients/mssql/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func parseValue(colVal any, colKind columns.Column) (any, error) {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err)
}

return _time, nil
case typing.TimestampTZ.Kind:
_time, err := ext.ParseTimestampTZFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err)
}

return _time, nil
case typing.ETime.Kind:
if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil {
Expand Down
15 changes: 15 additions & 0 deletions clients/mssql/values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func TestParseValue(t *testing.T) {
assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time))
}
}
{
// Timestamp TZ
{
// String
val, err := parseValue("2021-01-04T09:32:00Z", columns.NewColumn("timestamp_tz", typing.TimestampTZ))
assert.NoError(t, err)
assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time))
}
{
// time.Time
val, err := parseValue(time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), columns.NewColumn("timestamp_tz", typing.TimestampTZ))
assert.NoError(t, err)
assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time))
}
}
{
val, err := parseValue("string value", columns.NewColumn("foo", typing.String))
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
return "DATE"
case typing.TimestampNTZ.Kind:
return "TIMESTAMP WITHOUT TIME ZONE"
case typing.TimestampTZ.Kind:
return "TIMESTAMP WITH TIME ZONE"
case typing.ETime.Kind:
switch kd.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp with time zone"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) (
case "timestamp", "timestamp without time zone":
return typing.TimestampNTZ, nil
case "timestamp with time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
return typing.TimestampTZ, nil
case "time without time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
Expand Down
7 changes: 4 additions & 3 deletions clients/redshift/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) {
// Timestamps
{
// With timezone
assert.Equal(t, "timestamp with time zone", RedshiftDialect{}.DataTypeForKind(typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), false))
assert.Equal(t, "TIMESTAMP WITH TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampTZ, false))
}
{
// Without timezone
Expand Down Expand Up @@ -132,12 +132,13 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) {
{
// Times
{
// TimestampTZ
kd, err := dialect.KindForDataType("timestamp with time zone", "")
assert.NoError(t, err)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
assert.Equal(t, ext.TimestampTZKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, typing.TimestampTZ, kd)
}
{
// TimestampNTZ
kd, err := dialect.KindForDataType("timestamp without time zone", "")
assert.NoError(t, err)
assert.Equal(t, typing.TimestampNTZ, kd)
Expand Down
8 changes: 8 additions & 0 deletions clients/shared/default_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package shared
import (
"fmt"
"log/slog"
"time"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/destination"
Expand Down Expand Up @@ -35,6 +36,13 @@ func DefaultValue(column columns.Column, dialect sql.Dialect) (any, error) {
}

return sql.QuoteLiteral(_time.Format(ext.RFC3339NoTZ)), nil
case typing.TimestampTZ.Kind:
_time, err := ext.ParseTimestampTZFromInterface(column.DefaultValue())
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", column.DefaultValue(), err)
}

return sql.QuoteLiteral(_time.Format(time.RFC3339Nano)), nil
case typing.ETime.Kind:
if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand Down
19 changes: 5 additions & 14 deletions clients/shared/default_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,12 @@ var dialects = []sql.Dialect{

func TestColumn_DefaultValue(t *testing.T) {
birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC)
birthdayDateTime, err := ext.ParseDateTime(birthday.Format(ext.ISO8601), ext.TimestampTZKindType)
assert.NoError(t, err)

// time
timeKind := typing.ETime
timeNestedKind, err := ext.NewNestedKind(ext.TimeKindType, "")
assert.NoError(t, err)
timeKind.ExtendedTimeDetails = &timeNestedKind

// date time
dateTimeKind := typing.ETime
dateTimeNestedKind, err := ext.NewNestedKind(ext.TimestampTZKindType, "")
assert.NoError(t, err)
dateTimeKind.ExtendedTimeDetails = &dateTimeNestedKind

testCases := []struct {
name string
col columns.Column
Expand Down Expand Up @@ -79,22 +70,22 @@ func TestColumn_DefaultValue(t *testing.T) {
},
{
name: "date",
col: columns.NewColumnWithDefaultValue("", typing.Date, birthdayDateTime),
col: columns.NewColumnWithDefaultValue("", typing.Date, birthday),
expectedValue: "'2022-09-06'",
},
{
name: "timestamp_ntz",
col: columns.NewColumnWithDefaultValue("", typing.TimestampNTZ, birthdayDateTime),
col: columns.NewColumnWithDefaultValue("", typing.TimestampNTZ, birthday),
expectedValue: "'2022-09-06T03:19:24.942'",
},
{
name: "time",
col: columns.NewColumnWithDefaultValue("", timeKind, birthdayDateTime),
col: columns.NewColumnWithDefaultValue("", timeKind, birthday),
expectedValue: "'03:19:24.942'",
},
{
name: "datetime",
col: columns.NewColumnWithDefaultValue("", dateTimeKind, birthdayDateTime),
name: "timestamp_tz",
col: columns.NewColumnWithDefaultValue("", typing.TimestampTZ, birthday),
expectedValue: "'2022-09-06T03:19:24.942Z'",
},
}
Expand Down
Loading

0 comments on commit 7c345bb

Please sign in to comment.