Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 28, 2024
1 parent c0e685c commit 31eeba7
Show file tree
Hide file tree
Showing 20 changed files with 79 additions and 60 deletions.
6 changes: 3 additions & 3 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
case typing.Struct.Kind:
// Struct is a tighter version of JSON that requires type casting like Struct<int64>
return "json"
case typing.Date.Kind:
return "date"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
Expand All @@ -55,8 +57,6 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
return "timestamp"
case ext.TimestampNTZKindType:
return "datetime"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
return typing.NewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")
return typing.Date, nil
default:
return typing.Invalid, nil
}
Expand Down
4 changes: 2 additions & 2 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {
"datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"timestamp": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"date": typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, ""),
"date": typing.Date,
//Invalid
"foo": typing.Invalid,
"foofoo": typing.Invalid,
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestBigQueryDialect_KindForDataType_NoDataLoss(t *testing.T) {
kindDetails := []typing.KindDetails{
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, ""),
typing.Date,
typing.String,
typing.Boolean,
typing.Struct,
Expand Down
15 changes: 10 additions & 5 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem
fieldType = storagepb.TableFieldSchema_STRING
case typing.String.Kind:
fieldType = storagepb.TableFieldSchema_STRING
case typing.Date.Kind:
fieldType = storagepb.TableFieldSchema_DATE
case typing.ETime.Kind:
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.TimeKindType:
fieldType = storagepb.TableFieldSchema_TIME
case ext.DateKindType:
fieldType = storagepb.TableFieldSchema_DATE
case ext.TimestampTZKindType:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
case ext.TimestampNTZKindType:
Expand Down Expand Up @@ -201,6 +201,14 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
}

message.Set(field, protoreflect.ValueOfString(castedValue))
case typing.Date.Kind:
_time, err := ext.ParseDateFromInterface(value)
if err != nil {
return nil, fmt.Errorf("failed to cast value as time.Time, value: '%v', err: %w", value, err)
}

daysSinceEpoch := _time.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case typing.ETime.Kind:
if err := column.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand All @@ -214,9 +222,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.TimeKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64TimeMicros(_time)))
case ext.DateKindType:
daysSinceEpoch := _time.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case ext.TimestampTZKindType:
if err = timestamppb.New(_time).CheckValid(); err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestColumnToTableFieldSchema(t *testing.T) {
assert.Equal(t, storagepb.TableFieldSchema_TIME, fieldSchema.Type)
}
{
// ETime - Date:
fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")))
// Date
fieldSchema, err := columnToTableFieldSchema(columns.NewColumn("foo", typing.Date))
assert.NoError(t, err)
assert.Equal(t, storagepb.TableFieldSchema_DATE, fieldSchema.Type)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestRowToMessage(t *testing.T) {
columns.NewColumn("c_string", typing.String),
columns.NewColumn("c_string_decimal", typing.String),
columns.NewColumn("c_time", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")),
columns.NewColumn("c_date", typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")),
columns.NewColumn("c_date", typing.Date),
columns.NewColumn("c_datetime", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "")),
columns.NewColumn("c_struct", typing.Struct),
columns.NewColumn("c_array", typing.Array),
Expand All @@ -188,7 +188,7 @@ func TestRowToMessage(t *testing.T) {
"c_string": "foo bar",
"c_string_decimal": decimal.NewDecimal(numbers.MustParseDecimal("1.61803")),
"c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""),
"c_date": ext.NewExtendedTime(time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), ext.DateKindType, ""),
"c_date": time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC),
"c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.TimestampTZKindType, ""),
"c_struct": map[string]any{"baz": []string{"foo", "bar"}},
"c_array": []string{"foo", "bar"},
Expand Down
6 changes: 3 additions & 3 deletions clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
return "STRING"
case typing.Boolean.Kind:
return "BOOLEAN"
case typing.Date.Kind:
return "DATE"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
Expand All @@ -32,8 +34,6 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case ext.DateKindType:
return "DATE"
case ext.TimeKindType:
return "STRING"
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD
case "boolean":
return typing.Boolean, nil
case "date":
return typing.NewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")
return typing.Date, nil
case "double", "float":
return typing.Float, nil
case "int":
Expand Down
4 changes: 2 additions & 2 deletions clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDatabricksDialect_DataTypeForKind(t *testing.T) {
// Times
{
// Date
assert.Equal(t, "DATE", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.ETime.Kind, ExtendedTimeDetails: &ext.NestedKind{Type: ext.DateKindType}}, false))
assert.Equal(t, "DATE", DatabricksDialect{}.DataTypeForKind(typing.Date, false))
}
{
// Timestamp
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) {
// Date
kd, err := DatabricksDialect{}.KindForDataType("DATE", "")
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, ""), kd)
assert.Equal(t, typing.Date.Kind, kd)
}
{
// Double
Expand Down
6 changes: 3 additions & 3 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s
return "VARCHAR(MAX)"
case typing.Boolean.Kind:
return "BIT"
case typing.Date.Kind:
return "DATE"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "datetimeoffset"
case ext.TimestampNTZKindType:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
return typing.NewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")
return typing.Date, nil
case "bit":
return typing.Boolean, nil
case "text":
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestMSSQLDialect_KindForDataType(t *testing.T) {
"float": typing.Float,
"real": typing.Float,
"bit": typing.Boolean,
"date": typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, ""),
"date": typing.Date,
"time": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
"datetime": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
"datetime2": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
case typing.Boolean.Kind:
// We need to append `NULL` to let Redshift know that NULL is an acceptable data type.
return "BOOLEAN NULL"
case typing.Date.Kind:
return "DATE"
case typing.ETime.Kind:
switch kd.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp with time zone"
case ext.TimestampNTZKindType:
return "timestamp without time zone"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) (
case "time without time zone":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
return typing.NewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")
return typing.Date, nil
case "boolean":
return typing.Boolean, nil
}
Expand Down
3 changes: 1 addition & 2 deletions clients/redshift/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) {
{
kd, err := dialect.KindForDataType("date", "")
assert.NoError(t, err)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
assert.Equal(t, ext.DateKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, typing.Date, kd)
}
}
}
8 changes: 1 addition & 7 deletions clients/shared/default_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ func TestColumn_DefaultValue(t *testing.T) {
birthdayDateTime, err := ext.ParseDateTime(birthday.Format(ext.ISO8601), ext.TimestampTZKindType)
assert.NoError(t, err)

// date
dateKind := typing.ETime
dateNestedKind, err := ext.NewNestedKind(ext.DateKindType, "")
assert.NoError(t, err)
dateKind.ExtendedTimeDetails = &dateNestedKind

// time
timeKind := typing.ETime
timeNestedKind, err := ext.NewNestedKind(ext.TimeKindType, "")
Expand Down Expand Up @@ -85,7 +79,7 @@ func TestColumn_DefaultValue(t *testing.T) {
},
{
name: "date",
col: columns.NewColumnWithDefaultValue("", dateKind, birthdayDateTime),
col: columns.NewColumnWithDefaultValue("", typing.Date, birthdayDateTime),
expectedValue: "'2022-09-06'",
},
{
Expand Down
6 changes: 3 additions & 3 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
return "variant"
case typing.Boolean.Kind:
return "boolean"
case typing.Date.Kind:
return "date"
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTZKindType:
return "timestamp_tz"
case ext.TimestampNTZKindType:
return "timestamp_ntz"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
return "time"
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing.
case "time":
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimeKindType, "")
case "date":
return typing.NewExtendedTimeDetails(typing.ETime, ext.DateKindType, "")
return typing.Date, nil
default:
return typing.Invalid, nil
}
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestSnowflakeDialect_KindForDataType_NoDataLoss(t *testing.T) {
kindDetails := []typing.KindDetails{
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimeKindType, ""),
typing.MustNewExtendedTimeDetails(typing.ETime, ext.DateKindType, ""),
typing.Date,
typing.String,
typing.Boolean,
typing.Struct,
Expand Down
4 changes: 2 additions & 2 deletions lib/debezium/converters/date.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (Date) layout() string {
}

func (d Date) ToKindDetails() (typing.KindDetails, error) {
return typing.NewExtendedTimeDetails(typing.ETime, ext.DateKindType, d.layout())
return typing.Date, nil
}

func (d Date) Convert(value any) (any, error) {
Expand All @@ -25,5 +25,5 @@ func (d Date) Convert(value any) (any, error) {
}

// Represents the number of days since the epoch.
return ext.NewExtendedTime(time.UnixMilli(0).In(time.UTC).AddDate(0, 0, int(valueInt64)), ext.DateKindType, d.layout()), nil
return time.UnixMilli(0).In(time.UTC).AddDate(0, 0, int(valueInt64)), nil
}
2 changes: 1 addition & 1 deletion lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (t *TableData) DistinctDates(colName string) ([]string, error) {
return nil, fmt.Errorf("col: %v does not exist on row: %v", colName, row)
}

_time, err := ext.ParseFromInterface(val, ext.DateKindType)
_time, err := ext.ParseDateFromInterface(val)
if err != nil {
return nil, fmt.Errorf("col: %v is not a time column, value: %v, err: %w", colName, val, err)
}
Expand Down
9 changes: 8 additions & 1 deletion lib/parquetutil/parse_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) {
}

switch colKind.KindDetails.Kind {
case typing.Date.Kind:
_time, err := ext.ParseDateFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err)
}

return _time.Format(ext.PostgresDateFormat), nil
case typing.ETime.Kind:
if err := colKind.KindDetails.EnsureExtendedTimeDetails(); err != nil {
return "", err
Expand All @@ -30,7 +37,7 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err)
}

if colKind.KindDetails.ExtendedTimeDetails.Type == ext.DateKindType || colKind.KindDetails.ExtendedTimeDetails.Type == ext.TimeKindType {
if colKind.KindDetails.ExtendedTimeDetails.Type == ext.TimeKindType {
return _time.Format(colKind.KindDetails.ExtendedTimeDetails.Format), nil
}

Expand Down
28 changes: 18 additions & 10 deletions lib/typing/ext/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ func ParseTimeExactMatch(layout, value string) (time.Time, error) {
return ts, nil
}

func ParseDateFromInterface(val any) (time.Time, error) {
switch convertedVal := val.(type) {
case time.Time:
return convertedVal, nil
case *ExtendedTime:
return convertedVal.GetTime(), nil
case string:
return parseDate(convertedVal)
default:
return time.Time{}, fmt.Errorf("unsupported type: %T", convertedVal)
}
}

func ParseFromInterface(val any, kindType ExtendedTimeKindType) (time.Time, error) {
switch convertedVal := val.(type) {
case nil:
Expand All @@ -46,16 +59,6 @@ func ParseDateTime(value string, kindType ExtendedTimeKindType) (time.Time, erro
return parseTimestampNTZ(value)
case TimestampTZKindType:
return parseTimestampTZ(value)
case DateKindType:
// Try date first
if ts, err := parseDate(value); err == nil {
return ts, nil
}

// If that doesn't work, try timestamp
if ts, err := parseTimestampTZ(value); err == nil {
return ts, nil
}
case TimeKindType:
// Try time first
if ts, err := parseTime(value); err == nil {
Expand Down Expand Up @@ -97,6 +100,11 @@ func parseDate(value string) (time.Time, error) {
}
}

// If that doesn't work, try timestamp
if ts, err := parseTimestampTZ(value); err == nil {
return ts, nil
}

return time.Time{}, fmt.Errorf("unsupported value: %q", value)
}

Expand Down
2 changes: 0 additions & 2 deletions lib/typing/ext/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ func (e ExtendedTimeKindType) defaultLayout() (string, error) {
return time.RFC3339Nano, nil
case TimestampNTZKindType:
return RFC3339NoTZ, nil
case DateKindType:
return PostgresDateFormat, nil
case TimeKindType:
return PostgresTimeFormat, nil
default:
Expand Down
Loading

0 comments on commit 31eeba7

Please sign in to comment.