Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 31, 2024
1 parent 82c18cd commit e28b6b1
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 87 deletions.
2 changes: 1 addition & 1 deletion clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {
"record": typing.Struct,
"json": typing.Struct,
// Datetime
"datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"datetime": typing.TimestampNTZ,
"timestamp": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"date": typing.Date,
Expand Down
2 changes: 1 addition & 1 deletion clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) {
// Timestamp NTZ
kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP_NTZ", "")
assert.NoError(t, err)
assert.Equal(t, typing.TimestampNTZ.Kind, kd)
assert.Equal(t, typing.TimestampNTZ, kd)
}
{
// Variant
Expand Down
33 changes: 17 additions & 16 deletions clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ func TestMSSQLDialect_KindForDataType(t *testing.T) {
dialect := MSSQLDialect{}

colToExpectedKind := map[string]typing.KindDetails{
"char": typing.String,
"varchar": typing.String,
"nchar": typing.String,
"nvarchar": typing.String,
"ntext": typing.String,
"text": typing.String,
"smallint": typing.Integer,
"tinyint": typing.Integer,
"int": typing.Integer,
"float": typing.Float,
"real": typing.Float,
"bit": typing.Boolean,
"date": typing.Date,
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"datetime2": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"char": typing.String,
"varchar": typing.String,
"nchar": typing.String,
"nvarchar": typing.String,
"ntext": typing.String,
"text": typing.String,
"smallint": typing.Integer,
"tinyint": typing.Integer,
"int": typing.Integer,
"float": typing.Float,
"real": typing.Float,
"bit": typing.Boolean,
"date": typing.Date,
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"datetime": typing.TimestampNTZ,
"datetime2": typing.TimestampNTZ,
"datetimeoffset": typing.ETime,
}

for col, expectedKind := range colToExpectedKind {
Expand Down
15 changes: 3 additions & 12 deletions lib/debezium/converters/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)

type Timestamp struct{}
Expand All @@ -25,10 +24,6 @@ func (t Timestamp) Convert(value any) (any, error) {

type MicroTimestamp struct{}

func (MicroTimestamp) layout() string {
return ext.RFC3339MicrosecondNoTZ
}

func (mt MicroTimestamp) ToKindDetails() (typing.KindDetails, error) {
return typing.TimestampNTZ, nil
}
Expand All @@ -40,17 +35,13 @@ func (mt MicroTimestamp) Convert(value any) (any, error) {
}

// Represents the number of microseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMicro(castedValue).In(time.UTC), ext.TimestampNTZKindType, mt.layout()), nil
return time.UnixMicro(castedValue).In(time.UTC), nil
}

type NanoTimestamp struct{}

func (nt NanoTimestamp) ToKindDetails() (typing.KindDetails, error) {
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, nt.layout())
}

func (NanoTimestamp) layout() string {
return ext.RFC3339NanosecondNoTZ
return typing.TimestampNTZ, nil
}

func (nt NanoTimestamp) Convert(value any) (any, error) {
Expand All @@ -60,5 +51,5 @@ func (nt NanoTimestamp) Convert(value any) (any, error) {
}

// Represents the number of nanoseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMicro(castedValue/1_000).In(time.UTC), ext.TimestampNTZKindType, nt.layout()), nil
return time.UnixMicro(castedValue / 1_000).In(time.UTC), nil
}
26 changes: 7 additions & 19 deletions lib/debezium/converters/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.TimestampNTZ, kd)
{
// Invalid conversion
_, err := Timestamp{}.Convert("invalid")
_, err = Timestamp{}.Convert("invalid")
assert.ErrorContains(t, err, "expected type int64, got string")
}
{
Expand All @@ -29,45 +29,33 @@ func TestTimestamp_Converter(t *testing.T) {
func TestMicroTimestamp_Converter(t *testing.T) {
kd, err := MicroTimestamp{}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), kd)
assert.Equal(t, typing.TimestampNTZ, kd)
{
// Invalid conversion
_, err := MicroTimestamp{}.Convert("invalid")
_, err = MicroTimestamp{}.Convert("invalid")
assert.ErrorContains(t, err, "expected type int64, got string")
}
{
// Valid conversion
converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_827_923))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827923", converted.(*ext.ExtendedTime).GetTime().Format(MicroTimestamp{}.layout()))
}
{
// micros is preserved despite it being all zeroes.
converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_820_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.820000", converted.(*ext.ExtendedTime).GetTime().Format(MicroTimestamp{}.layout()))
assert.Equal(t, "2024-04-08T20:56:35.827923", converted.(time.Time).Format(ext.RFC3339NoTZ))
}
}

func TestNanoTimestamp_Converter(t *testing.T) {
kd, err := NanoTimestamp{}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339NanosecondNoTZ), kd)
assert.Equal(t, typing.TimestampNTZ, kd)
{
// Invalid conversion
_, err := NanoTimestamp{}.Convert("invalid")
_, err = NanoTimestamp{}.Convert("invalid")
assert.ErrorContains(t, err, "expected type int64, got string")
}
{
// Valid conversion
converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_001_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827001000", converted.(*ext.ExtendedTime).GetTime().Format(NanoTimestamp{}.layout()))
}
{
// nanos is preserved despite it being all zeroes.
converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_000_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827000000", converted.(*ext.ExtendedTime).GetTime().Format(NanoTimestamp{}.layout()))
assert.Equal(t, "2024-04-08T20:56:35.827001", converted.(time.Time).Format(ext.RFC3339NoTZ))
}
}
4 changes: 1 addition & 3 deletions lib/debezium/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -119,7 +117,7 @@ func TestParsePartitionKeyStruct(t *testing.T) {
`))
assert.NoError(t, err)
assert.Equal(t, "339f3f2f-f29f-4f00-869e-476122310eff", keys["id"])
assert.Equal(t, time.Date(2024, 4, 16, 1, 8, 19, 440000000, time.UTC), keys["created_at"].(*ext.ExtendedTime).GetTime())
assert.Equal(t, time.Date(2024, 4, 16, 1, 8, 19, 440000000, time.UTC), keys["created_at"].(time.Time))

keys, err = parsePartitionKeyStruct([]byte(`{
"schema": {
Expand Down
3 changes: 1 addition & 2 deletions lib/debezium/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ func TestField_ToKindDetails(t *testing.T) {
for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
assert.Equal(t, typing.TimestampNTZ, kd)
}
}
{
Expand Down
14 changes: 7 additions & 7 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,30 +302,30 @@ func TestField_ParseValue(t *testing.T) {
field := Field{Type: Int64, DebeziumType: dbzType}
value, err := field.ParseValue(int64(1_725_058_799_000))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.000", value.(*ext.ExtendedTime).GetTime().Format(ext.RFC3339MillisecondNoTZ))
assert.Equal(t, "2024-08-30T22:59:59.000", value.(time.Time).Format(ext.RFC3339MillisecondNoTZ))
}
}
{
// Nano timestamp
field := Field{Type: Int64, DebeziumType: NanoTimestamp}
val, err := field.ParseValue(int64(1_712_609_795_827_000_000))
val, err := field.ParseValue(int64(1_712_609_795_827_001_000))
assert.NoError(t, err)
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), ext.TimestampNTZKindType, "2006-01-02T15:04:05.000000000"), val.(*ext.ExtendedTime))
assert.Equal(t, time.Date(2024, time.April, 8, 20, 56, 35, 827001000, time.UTC), val.(time.Time))
}
{
// Micro timestamp
field := Field{Type: Int64, DebeziumType: MicroTimestamp}
{
// Int64
val, err := field.ParseValue(int64(1_712_609_795_827_009))
val, err := field.ParseValue(int64(1_712_609_795_827_000))
assert.NoError(t, err)
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827009000, time.UTC), ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), val.(*ext.ExtendedTime))
assert.Equal(t, time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), val.(time.Time))
}
{
// Float64
val, err := field.ParseValue(float64(1_712_609_795_827_001))
val, err := field.ParseValue(float64(1_712_609_795_827_000))
assert.NoError(t, err)
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827001000, time.UTC), ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), val.(*ext.ExtendedTime))
assert.Equal(t, time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), val.(time.Time))
}
{
// Invalid (string)
Expand Down
7 changes: 0 additions & 7 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,6 @@ func mergeColumn(inMemoryCol columns.Column, destCol columns.Column) columns.Col
inMemoryCol.KindDetails.ExtendedTimeDetails = &ext.NestedKind{}
}

// If the column in the destination is a timestamp_tz and the in-memory column is a timestamp_ntz, we should update the layout to contain timezone locale.
if destCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampTZKindType && inMemoryCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampNTZKindType {
if inMemoryCol.KindDetails.ExtendedTimeDetails.Format != "" {
inMemoryCol.KindDetails.ExtendedTimeDetails.Format += ext.TimezoneOffsetFormat
}
}

// Copy over the type
inMemoryCol.KindDetails.ExtendedTimeDetails.Type = destCol.KindDetails.ExtendedTimeDetails.Type
// If the in-memory column has no format, we should use the format from the destination.
Expand Down
4 changes: 2 additions & 2 deletions lib/optimization/table_data_merge_columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func TestTableData_UpdateInMemoryColumnsFromDestination_Tz(t *testing.T) {
tableData.AddInMemoryCol(
columns.NewColumn(
"foo",
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MillisecondNoTZ),
typing.TimestampNTZ,
),
)

assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))))
updatedColumn, isOk := tableData.inMemoryColumns.GetColumn("foo")
assert.True(t, isOk)
assert.Equal(t, ext.TimestampTZKindType, updatedColumn.KindDetails.ExtendedTimeDetails.Type)
assert.Equal(t, ext.RFC3339Millisecond, updatedColumn.KindDetails.ExtendedTimeDetails.Format)
assert.Equal(t, "2006-01-02T15:04:05.999999999Z07:00", updatedColumn.KindDetails.ExtendedTimeDetails.Format)
}

}
Expand Down
2 changes: 1 addition & 1 deletion lib/optimization/table_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func TestMergeColumn(t *testing.T) {
{
// Testing for backwards compatibility
// in-memory column is TimestampNTZ, destination column is TimestampTZ
timestampNTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ""))
timestampNTZColumn := columns.NewColumn("foo", typing.TimestampNTZ)
timestampTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))
col := mergeColumn(timestampNTZColumn, timestampTZColumn)
assert.Equal(t, ext.TimestampTZKindType, col.KindDetails.ExtendedTimeDetails.Type)
Expand Down
2 changes: 0 additions & 2 deletions lib/typing/ext/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, erro

func ParseDateTime(value string, kindType ExtendedTimeKindType) (time.Time, error) {
switch kindType {
case TimestampNTZKindType:
return parseTimestampNTZ(value)
case TimestampTZKindType:
return parseTimestampTZ(value)
case TimeKindType:
Expand Down
20 changes: 9 additions & 11 deletions lib/typing/ext/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ func TestParseFromInterface(t *testing.T) {
{
// Extended time
var vals []*ExtendedTime
vals = append(vals, NewExtendedTime(time.Now().UTC(), TimestampNTZKindType, RFC3339NoTZ))
vals = append(vals, NewExtendedTime(time.Now().UTC(), TimestampTZKindType, ISO8601))
vals = append(vals, NewExtendedTime(time.Now().UTC(), TimeKindType, PostgresTimeFormat))

for _, val := range vals {
_time, err := ParseFromInterface(val, TimestampTZKindType)
assert.NoError(t, err)
Expand Down Expand Up @@ -90,34 +88,34 @@ func TestParseExtendedDateTime_TimestampTZ(t *testing.T) {
assert.Equal(t, tsString, extTime.Format(time.RFC3339Nano))
}

func TestParseExtendedDateTime_TimestampNTZ(t *testing.T) {
func TestParseTimestampNTZFromInterface(t *testing.T) {
{
// No fractional seconds
tsString := "2023-04-24T17:29:05"
extTime, err := ParseDateTime(tsString, TimestampNTZKindType)
ts, err := ParseTimestampNTZFromInterface(tsString)
assert.NoError(t, err)
assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ))
assert.Equal(t, tsString, ts.Format(RFC3339NoTZ))
}
{
// ms
tsString := "2023-04-24T17:29:05.123"
extTime, err := ParseDateTime(tsString, TimestampNTZKindType)
ts, err := ParseTimestampNTZFromInterface(tsString)
assert.NoError(t, err)
assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ))
assert.Equal(t, tsString, ts.Format(RFC3339NoTZ))
}
{
// microseconds
tsString := "2023-04-24T17:29:05.123456"
extTime, err := ParseDateTime(tsString, TimestampNTZKindType)
ts, err := ParseTimestampNTZFromInterface(tsString)
assert.NoError(t, err)
assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ))
assert.Equal(t, tsString, ts.Format(RFC3339NoTZ))
}
{
// ns
tsString := "2023-04-24T17:29:05.123456789"
extTime, err := ParseDateTime(tsString, TimestampNTZKindType)
ts, err := ParseTimestampNTZFromInterface(tsString)
assert.NoError(t, err)
assert.Equal(t, tsString, extTime.Format(RFC3339NoTZ))
assert.Equal(t, tsString, ts.Format(RFC3339NoTZ))
}
}

Expand Down
5 changes: 2 additions & 3 deletions lib/typing/ext/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
type ExtendedTimeKindType string

const (
TimestampTZKindType ExtendedTimeKindType = "timestamp_tz"
TimestampNTZKindType ExtendedTimeKindType = "timestamp_ntz"
TimeKindType ExtendedTimeKindType = "time"
TimestampTZKindType ExtendedTimeKindType = "timestamp_tz"
TimeKindType ExtendedTimeKindType = "time"
)

func (e ExtendedTimeKindType) defaultLayout() (string, error) {
Expand Down

0 comments on commit e28b6b1

Please sign in to comment.