Skip to content

Commit

Permalink
[Extended Time] Pulling TIMESTAMP_NTZ into a separate data type (#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 31, 2024
1 parent b67409b commit 2b16ce3
Show file tree
Hide file tree
Showing 33 changed files with 175 additions and 138 deletions.
6 changes: 3 additions & 3 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
return "json"
case typing.Date.Kind:
return "date"
case typing.TimestampNTZ.Kind:
return "datetime"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case ext.TimestampNTZKindType:
return "datetime"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD
case "timestamp":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "datetime":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
Expand Down
2 changes: 1 addition & 1 deletion clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {
"record": typing.Struct,
"json": typing.Struct,
// Datetime
"datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"datetime": typing.TimestampNTZ,
"timestamp": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"date": typing.Date,
Expand Down
13 changes: 9 additions & 4 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem
fieldType = storagepb.TableFieldSchema_STRING
case typing.Date.Kind:
fieldType = storagepb.TableFieldSchema_DATE
case typing.TimestampNTZ.Kind:
fieldType = storagepb.TableFieldSchema_DATETIME
case typing.ETime.Kind:
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.TimeKindType:
fieldType = storagepb.TableFieldSchema_TIME
case ext.TimestampTZKindType:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
case ext.TimestampNTZKindType:
fieldType = storagepb.TableFieldSchema_DATETIME
default:
return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down Expand Up @@ -209,6 +209,13 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto

daysSinceEpoch := _time.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case typing.TimestampNTZ.Kind:
_time, err := ext.ParseTimestampNTZFromInterface(value)
if err != nil {
return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err)
}

message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
case typing.ETime.Kind:
if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand All @@ -227,8 +234,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
return nil, err
}
message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro()))
case ext.TimestampNTZKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
default:
return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down
9 changes: 6 additions & 3 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func TestRowToMessage(t *testing.T) {
columns.NewColumn("c_string", typing.String),
columns.NewColumn("c_string_decimal", typing.String),
columns.NewColumn("c_time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")),
columns.NewColumn("c_timestamp", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")),
columns.NewColumn("c_date", typing.Date),
columns.NewColumn("c_datetime", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")),
columns.NewColumn("c_datetime", typing.TimestampNTZ),
columns.NewColumn("c_struct", typing.Struct),
columns.NewColumn("c_array", typing.Array),
}
Expand All @@ -188,8 +189,9 @@ func TestRowToMessage(t *testing.T) {
"c_string": "foo bar",
"c_string_decimal": decimal.NewDecimal(numbers.MustParseDecimal("1.61803")),
"c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""),
"c_timestamp": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""),
"c_date": time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC),
"c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""),
"c_datetime": time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC),
"c_struct": map[string]any{"baz": []string{"foo", "bar"}},
"c_array": []string{"foo", "bar"},
}
Expand Down Expand Up @@ -220,8 +222,9 @@ func TestRowToMessage(t *testing.T) {
"cString": "foo bar",
"cStringDecimal": "1.61803",
"cTime": "17521704960",
"cTimestamp": "981173106000000",
"cDate": float64(11356),
"cDatetime": "981173106000000",
"cDatetime": "140817083031093248",
"cStruct": `{"baz":["foo","bar"]}`,
"cArray": []any{"foo", "bar"},
}, result)
Expand Down
10 changes: 5 additions & 5 deletions clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
return "BOOLEAN"
case typing.Date.Kind:
return "DATE"
case typing.TimestampNTZ.Kind:
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "TIMESTAMP"
case ext.TimestampNTZKindType:
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case ext.TimeKindType:
return "STRING"
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD
case "timestamp":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "timestamp_ntz":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
}

return typing.Invalid, fmt.Errorf("unsupported data type: %q", rawType)
Expand Down
2 changes: 1 addition & 1 deletion clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) {
// Timestamp NTZ
kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP_NTZ", "")
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""), kd)
assert.Equal(t, typing.TimestampNTZ, kd)
}
{
// Variant
Expand Down
8 changes: 4 additions & 4 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s
return "BIT"
case typing.Date.Kind:
return "DATE"
case typing.TimestampNTZ.Kind:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "datetimeoffset"
case ext.TimestampNTZKindType:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ
case
"datetime",
"datetime2":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "datetimeoffset":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "time":
Expand Down
33 changes: 17 additions & 16 deletions clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ func TestMSSQLDialect_KindForDataType(t *testing.T) {
dialect := MSSQLDialect{}

colToExpectedKind := map[string]typing.KindDetails{
"char": typing.String,
"varchar": typing.String,
"nchar": typing.String,
"nvarchar": typing.String,
"ntext": typing.String,
"text": typing.String,
"smallint": typing.Integer,
"tinyint": typing.Integer,
"int": typing.Integer,
"float": typing.Float,
"real": typing.Float,
"bit": typing.Boolean,
"date": typing.Date,
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"datetime2": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"char": typing.String,
"varchar": typing.String,
"nchar": typing.String,
"nvarchar": typing.String,
"ntext": typing.String,
"text": typing.String,
"smallint": typing.Integer,
"tinyint": typing.Integer,
"int": typing.Integer,
"float": typing.Float,
"real": typing.Float,
"bit": typing.Boolean,
"date": typing.Date,
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"datetime": typing.TimestampNTZ,
"datetime2": typing.TimestampNTZ,
"datetimeoffset": typing.ETime,
}

for col, expectedKind := range colToExpectedKind {
Expand Down
7 changes: 7 additions & 0 deletions clients/mssql/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ func parseValue(colVal any, colKind columns.Column) (any, error) {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err)
}

return _time, nil
case typing.TimestampNTZ.Kind:
_time, err := ext.ParseTimestampNTZFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", colVal, err)
}

return _time, nil
case typing.ETime.Kind:
if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil {
Expand Down
15 changes: 15 additions & 0 deletions clients/mssql/values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ func TestParseValue(t *testing.T) {
assert.Equal(t, "2021-01-01", val.(time.Time).Format(ext.PostgresDateFormat))
}
}
{
// Timestamp NTZ
{
// String
val, err := parseValue("2021-01-04T09:32:00", columns.NewColumn("timestamp_ntz", typing.TimestampNTZ))
assert.NoError(t, err)
assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time))
}
{
// time.Time
val, err := parseValue(time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), columns.NewColumn("timestamp_ntz", typing.TimestampNTZ))
assert.NoError(t, err)
assert.Equal(t, time.Date(2021, time.January, 4, 9, 32, 0, 0, time.UTC), val.(time.Time))
}
}
{
val, err := parseValue("string value", columns.NewColumn("foo", typing.String))
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
return "BOOLEAN NULL"
case typing.Date.Kind:
return "DATE"
case typing.TimestampNTZ.Kind:
return "TIMESTAMP WITHOUT TIME ZONE"
case typing.ETime.Kind:
switch kd.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp with time zone"
case ext.TimestampNTZKindType:
return "timestamp without time zone"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) (
case "double precision":
return typing.Float, nil
case "timestamp", "timestamp without time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "timestamp with time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "time without time zone":
Expand Down
5 changes: 2 additions & 3 deletions clients/redshift/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) {
}
{
// Without timezone
assert.Equal(t, "timestamp without time zone", RedshiftDialect{}.DataTypeForKind(typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""), false))
assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false))
}
}
}
Expand Down Expand Up @@ -140,8 +140,7 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) {
{
kd, err := dialect.KindForDataType("timestamp without time zone", "")
assert.NoError(t, err)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, typing.TimestampNTZ, kd)
}
{
kd, err := dialect.KindForDataType("time without time zone", "")
Expand Down
7 changes: 7 additions & 0 deletions clients/shared/default_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ func DefaultValue(column columns.Column, dialect sql.Dialect) (any, error) {
}

return sql.QuoteLiteral(_time.Format(ext.PostgresDateFormat)), nil
case typing.TimestampNTZ.Kind:
_time, err := ext.ParseTimestampNTZFromInterface(column.DefaultValue())
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: '%v', err: %w", column.DefaultValue(), err)
}

return sql.QuoteLiteral(_time.Format(ext.RFC3339NoTZ)), nil
case typing.ETime.Kind:
if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions clients/shared/default_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func TestColumn_DefaultValue(t *testing.T) {
col: columns.NewColumnWithDefaultValue("", typing.Date, birthdayDateTime),
expectedValue: "'2022-09-06'",
},
{
name: "timestamp_ntz",
col: columns.NewColumnWithDefaultValue("", typing.TimestampNTZ, birthdayDateTime),
expectedValue: "'2022-09-06T03:19:24.942'",
},
{
name: "time",
col: columns.NewColumnWithDefaultValue("", timeKind, birthdayDateTime),
Expand Down
6 changes: 3 additions & 3 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
return "boolean"
case typing.Date.Kind:
return "date"
case typing.TimestampNTZ.Kind:
return "timestamp_ntz"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp_tz"
case ext.TimestampNTZKindType:
return "timestamp_ntz"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing.
case "timestamp_ltz", "timestamp_tz":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "timestamp", "datetime", "timestamp_ntz":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSnowflakeDialect_KindForDataType_DateTime(t *testing.T) {
for _, expectedDateTime := range expectedDateTimes {
kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "")
assert.NoError(t, err)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type, expectedDateTime)
assert.Equal(t, typing.TimestampNTZ, kd, expectedDateTime)
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
// Datetime without TZ is emitted in microseconds which is 1000x larger than nanoseconds.
assert.Equal(
r.T(),
ext.NewExtendedTime(
time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC),
ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ,
),
time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC),
evtData["ts_no_tz1"],
)
assert.Equal(r.T(), time.Date(2023, time.February, 2, 17, 54, 11, 451000000, time.UTC), evt.GetExecutionTime())
Expand Down
Loading

0 comments on commit 2b16ce3

Please sign in to comment.