Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 31, 2024
1 parent 58b0e38 commit 82c18cd
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 48 deletions.
6 changes: 3 additions & 3 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
return "json"
case typing.Date.Kind:
return "date"
case typing.TimestampNTZ.Kind:
return "datetime"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case ext.TimestampNTZKindType:
return "datetime"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD
case "timestamp":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "datetime":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
Expand Down
13 changes: 9 additions & 4 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem
fieldType = storagepb.TableFieldSchema_STRING
case typing.Date.Kind:
fieldType = storagepb.TableFieldSchema_DATE
case typing.TimestampNTZ.Kind:
fieldType = storagepb.TableFieldSchema_DATETIME
case typing.ETime.Kind:
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.TimeKindType:
fieldType = storagepb.TableFieldSchema_TIME
case ext.TimestampTZKindType:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
case ext.TimestampNTZKindType:
fieldType = storagepb.TableFieldSchema_DATETIME
default:
return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down Expand Up @@ -209,6 +209,13 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto

daysSinceEpoch := _time.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case typing.TimestampNTZ.Kind:
_time, err := ext.ParseTimestampNTZFromInterface(value)
if err != nil {
return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err)
}

message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
case typing.ETime.Kind:
if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand All @@ -227,8 +234,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
return nil, err
}
message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro()))
case ext.TimestampNTZKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
default:
return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down
10 changes: 5 additions & 5 deletions clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
return "BOOLEAN"
case typing.Date.Kind:
return "DATE"
case typing.TimestampNTZ.Kind:
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "TIMESTAMP"
case ext.TimestampNTZKindType:
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case ext.TimeKindType:
return "STRING"
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD
case "timestamp":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "timestamp_ntz":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
}

return typing.Invalid, fmt.Errorf("unsupported data type: %q", rawType)
Expand Down
2 changes: 1 addition & 1 deletion clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) {
// Timestamp NTZ
kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP_NTZ", "")
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""), kd)
assert.Equal(t, typing.TimestampNTZ.Kind, kd)
}
{
// Variant
Expand Down
8 changes: 4 additions & 4 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s
return "BIT"
case typing.Date.Kind:
return "DATE"
case typing.TimestampNTZ.Kind:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "datetimeoffset"
case ext.TimestampNTZKindType:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ
case
"datetime",
"datetime2":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "datetimeoffset":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "time":
Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
return "BOOLEAN NULL"
case typing.Date.Kind:
return "DATE"
case typing.TimestampNTZ.Kind:
return "TIMESTAMP WITHOUT TIME ZONE"
case typing.ETime.Kind:
switch kd.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp with time zone"
case ext.TimestampNTZKindType:
return "timestamp without time zone"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) (
case "double precision":
return typing.Float, nil
case "timestamp", "timestamp without time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "timestamp with time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "time without time zone":
Expand Down
5 changes: 2 additions & 3 deletions clients/redshift/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) {
}
{
// Without timezone
assert.Equal(t, "timestamp without time zone", RedshiftDialect{}.DataTypeForKind(typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""), false))
assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false))
}
}
}
Expand Down Expand Up @@ -140,8 +140,7 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) {
{
kd, err := dialect.KindForDataType("timestamp without time zone", "")
assert.NoError(t, err)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, typing.TimestampNTZ, kd)
}
{
kd, err := dialect.KindForDataType("time without time zone", "")
Expand Down
6 changes: 3 additions & 3 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
return "boolean"
case typing.Date.Kind:
return "date"
case typing.TimestampNTZ.Kind:
return "timestamp_ntz"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp_tz"
case ext.TimestampNTZKindType:
return "timestamp_ntz"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing.
case "timestamp_ltz", "timestamp_tz":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")
case "timestamp", "datetime", "timestamp_ntz":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, "")
return typing.TimestampNTZ, nil
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSnowflakeDialect_KindForDataType_DateTime(t *testing.T) {
for _, expectedDateTime := range expectedDateTimes {
kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "")
assert.NoError(t, err)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type, expectedDateTime)
assert.Equal(t, typing.TimestampNTZ, kd, expectedDateTime)
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
// Datetime without TZ is emitted in microseconds which is 1000x larger than nanoseconds.
assert.Equal(
r.T(),
ext.NewExtendedTime(
time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC),
ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ,
),
time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC),
evtData["ts_no_tz1"],
)
assert.Equal(r.T(), time.Date(2023, time.February, 2, 17, 54, 11, 451000000, time.UTC), evt.GetExecutionTime())
Expand Down
10 changes: 3 additions & 7 deletions lib/debezium/converters/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ import (

type Timestamp struct{}

func (Timestamp) layout() string {
return ext.RFC3339MillisecondNoTZ
}

func (t Timestamp) ToKindDetails() (typing.KindDetails, error) {
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, t.layout())
return typing.TimestampNTZ, nil
}

func (t Timestamp) Convert(value any) (any, error) {
Expand All @@ -24,7 +20,7 @@ func (t Timestamp) Convert(value any) (any, error) {
}

// Represents the number of milliseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMilli(castedValue).In(time.UTC), ext.TimestampNTZKindType, t.layout()), nil
return time.UnixMilli(castedValue).In(time.UTC), nil
}

type MicroTimestamp struct{}
Expand All @@ -34,7 +30,7 @@ func (MicroTimestamp) layout() string {
}

func (mt MicroTimestamp) ToKindDetails() (typing.KindDetails, error) {
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, mt.layout())
return typing.TimestampNTZ, nil
}

func (mt MicroTimestamp) Convert(value any) (any, error) {
Expand Down
11 changes: 3 additions & 8 deletions lib/debezium/converters/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package converters

import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
Expand All @@ -11,7 +12,7 @@ import (
func TestTimestamp_Converter(t *testing.T) {
kd, err := Timestamp{}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MillisecondNoTZ), kd)
assert.Equal(t, typing.TimestampNTZ, kd)
{
// Invalid conversion
_, err := Timestamp{}.Convert("invalid")
Expand All @@ -21,13 +22,7 @@ func TestTimestamp_Converter(t *testing.T) {
// Valid conversion
converted, err := Timestamp{}.Convert(int64(1_725_058_799_089))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.089", converted.(*ext.ExtendedTime).GetTime().Format(Timestamp{}.layout()))
}
{
// ms is preserved despite it being all zeroes.
converted, err := Timestamp{}.Convert(int64(1_725_058_799_000))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.000", converted.(*ext.ExtendedTime).GetTime().Format(Timestamp{}.layout()))
assert.Equal(t, "2024-08-30T22:59:59.089", converted.(time.Time).Format(ext.RFC3339NoTZ))
}
}

Expand Down
13 changes: 13 additions & 0 deletions lib/typing/ext/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ func ParseDateFromInterface(val any) (time.Time, error) {
}
}

func ParseTimestampNTZFromInterface(val any) (time.Time, error) {
switch convertedVal := val.(type) {
case time.Time:
return convertedVal, nil
case *ExtendedTime:
return convertedVal.GetTime(), nil
case string:
return parseTimestampNTZ(convertedVal)
default:
return time.Time{}, fmt.Errorf("unsupported type: %T", convertedVal)
}
}

func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, error) {
switch convertedVal := val.(type) {
case nil:
Expand Down
2 changes: 0 additions & 2 deletions lib/typing/ext/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ func (e ExtendedTimeKindType) defaultLayout() (string, error) {
switch e {
case TimestampTZKindType:
return time.RFC3339Nano, nil
case TimestampNTZKindType:
return RFC3339NoTZ, nil
case TimeKindType:
return PostgresTimeFormat, nil
default:
Expand Down
4 changes: 4 additions & 0 deletions lib/typing/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ var (
Kind: "date",
}

TimestampNTZ = KindDetails{
Kind: "timestamp_ntz",
}

ETime = KindDetails{
Kind: "extended_time",
}
Expand Down

0 comments on commit 82c18cd

Please sign in to comment.