Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Extended Time] Fix parsing timezones #984

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clients/bigquery/converters/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package converters
import (
"fmt"
"strconv"
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/decimal"
Expand All @@ -25,6 +26,12 @@ func (s StringConverter) Convert(value any) (any, error) {
return castedValue.String(), nil
case bool:
return fmt.Sprint(castedValue), nil
case time.Time:
if err := s.kd.EnsureExtendedTimeDetails(); err != nil {
return nil, err
}

return castedValue.Format(s.kd.ExtendedTimeDetails.Format), nil
case *ext.ExtendedTime:
if err := s.kd.EnsureExtendedTimeDetails(); err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions clients/bigquery/converters/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ func TestStringConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "2021-01-01T00:00:00Z", val)
}
{
// time.Time
val, err := NewStringConverter(typing.MustNewExtendedTimeDetails(typing.String, ext.TimestampTZKindType, "")).Convert(
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, "2021-01-01T00:00:00Z", val)
}
}

func TestInt64Converter_Convert(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/converters/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (m MicroTime) Convert(value any) (any, error) {
type ZonedTimestamp struct{}

func (ZonedTimestamp) layout() string {
return "2006-01-02T15:04:05.999999999Z"
return ext.RFC3339
}

func (z ZonedTimestamp) ToKindDetails() (typing.KindDetails, error) {
Expand Down
8 changes: 8 additions & 0 deletions lib/debezium/converters/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ func TestZonedTimestamp_Convert(t *testing.T) {
assert.Equal(t, expectedExtTime, val.(*ext.ExtendedTime))
assert.Equal(t, "2025-09-13T00:00:00.123456789Z", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout()))
}
{
// Testing TZ offset
val, err := ZonedTimestamp{}.Convert("2025-09-13T00:00:00.123456789+07:00")
assert.NoError(t, err)

assert.Equal(t, time.Date(2025, time.September, 13, 0, 0, 0, 123456789, time.FixedZone("", 7*60*60)), val.(*ext.ExtendedTime).GetTime())
assert.Equal(t, "2025-09-13T00:00:00.123456789+07:00", val.(*ext.ExtendedTime).GetTime().Format(ZonedTimestamp{}.layout()))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestField_ToKindDetails(t *testing.T) {
for _, dbzType := range []SupportedDebeziumType{ZonedTimestamp} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, "2006-01-02T15:04:05.999999999Z"), kd)
assert.Equal(t, typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ext.RFC3339), kd)
}
}
{
Expand Down
74 changes: 39 additions & 35 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,49 +273,53 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro
}

if found {
// TODO: Move this whole block into a function and add unit-tests.
inMemoryCol.KindDetails.Kind = foundColumn.KindDetails.Kind
// Copy over backfilled
inMemoryCol.SetBackfilled(foundColumn.Backfilled())

// Copy over string precision, if it exists
if foundColumn.KindDetails.OptionalStringPrecision != nil {
inMemoryCol.KindDetails.OptionalStringPrecision = foundColumn.KindDetails.OptionalStringPrecision
}
t.inMemoryColumns.UpdateColumn(mergeColumn(inMemoryCol, foundColumn))
}
}

// Copy over integer kind, if exists.
if foundColumn.KindDetails.OptionalIntegerKind != nil {
inMemoryCol.KindDetails.OptionalIntegerKind = foundColumn.KindDetails.OptionalIntegerKind
}
return nil
}

// Copy over the time details
if foundColumn.KindDetails.ExtendedTimeDetails != nil {
if inMemoryCol.KindDetails.ExtendedTimeDetails == nil {
inMemoryCol.KindDetails.ExtendedTimeDetails = &ext.NestedKind{}
}
// mergeColumn - This function will merge the in-memory column with the destination column.
func mergeColumn(inMemoryCol columns.Column, destCol columns.Column) columns.Column {
inMemoryCol.KindDetails.Kind = destCol.KindDetails.Kind
// Copy over backfilled
inMemoryCol.SetBackfilled(destCol.Backfilled())

// If the column in the destination is a timestamp_tz and the in-memory column is a timestamp_ntz, we should update the layout to contain timezone locale.
if foundColumn.KindDetails.ExtendedTimeDetails.Type == ext.TimestampTZKindType && inMemoryCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampNTZKindType {
if inMemoryCol.KindDetails.ExtendedTimeDetails.Format != "" {
inMemoryCol.KindDetails.ExtendedTimeDetails.Format += ext.TimezoneOffsetFormat
}
}
// Copy over string precision, if it exists
if destCol.KindDetails.OptionalStringPrecision != nil {
inMemoryCol.KindDetails.OptionalStringPrecision = destCol.KindDetails.OptionalStringPrecision
}

// Copy over the type
inMemoryCol.KindDetails.ExtendedTimeDetails.Type = foundColumn.KindDetails.ExtendedTimeDetails.Type
// If the in-memory column has no format, we should use the format from the destination.
inMemoryCol.KindDetails.ExtendedTimeDetails.Format = cmp.Or(inMemoryCol.KindDetails.ExtendedTimeDetails.Format, foundColumn.KindDetails.ExtendedTimeDetails.Format)
// Copy over integer kind, if exists.
if destCol.KindDetails.OptionalIntegerKind != nil {
inMemoryCol.KindDetails.OptionalIntegerKind = destCol.KindDetails.OptionalIntegerKind
}

}
// Copy over the time details
if destCol.KindDetails.ExtendedTimeDetails != nil {
if inMemoryCol.KindDetails.ExtendedTimeDetails == nil {
inMemoryCol.KindDetails.ExtendedTimeDetails = &ext.NestedKind{}
}

// Copy over the decimal details
if foundColumn.KindDetails.ExtendedDecimalDetails != nil && inMemoryCol.KindDetails.ExtendedDecimalDetails == nil {
inMemoryCol.KindDetails.ExtendedDecimalDetails = foundColumn.KindDetails.ExtendedDecimalDetails
// If the column in the destination is a timestamp_tz and the in-memory column is a timestamp_ntz, we should update the layout to contain timezone locale.
if destCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampTZKindType && inMemoryCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampNTZKindType {
if inMemoryCol.KindDetails.ExtendedTimeDetails.Format != "" {
inMemoryCol.KindDetails.ExtendedTimeDetails.Format += ext.TimezoneOffsetFormat
}

t.inMemoryColumns.UpdateColumn(inMemoryCol)
}

// Copy over the type
inMemoryCol.KindDetails.ExtendedTimeDetails.Type = destCol.KindDetails.ExtendedTimeDetails.Type
// If the in-memory column has no format, we should use the format from the destination.
inMemoryCol.KindDetails.ExtendedTimeDetails.Format = cmp.Or(inMemoryCol.KindDetails.ExtendedTimeDetails.Format, destCol.KindDetails.ExtendedTimeDetails.Format)

}

return nil
// Copy over the decimal details
if destCol.KindDetails.ExtendedDecimalDetails != nil && inMemoryCol.KindDetails.ExtendedDecimalDetails == nil {
inMemoryCol.KindDetails.ExtendedDecimalDetails = destCol.KindDetails.ExtendedDecimalDetails
}

return inMemoryCol
}
65 changes: 62 additions & 3 deletions lib/optimization/table_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
)

func TestDistinctDates(t *testing.T) {
Expand Down Expand Up @@ -331,3 +332,61 @@ func TestTableData_InsertRowSoftDelete(t *testing.T) {
assert.Equal(t, false, td.Rows()[0][constants.DeleteColumnMarker])
}
}

func TestMergeColumn(t *testing.T) {
{
// Make sure it copies the kind over
col := mergeColumn(columns.NewColumn("foo", typing.String), columns.NewColumn("foo", typing.Boolean))
assert.Equal(t, typing.Boolean, col.KindDetails)
}
{
// Make sure it copies the backfill over
backfilledCol := columns.NewColumn("foo", typing.String)
backfilledCol.SetBackfilled(true)
cols := mergeColumn(columns.NewColumn("foo", typing.String), backfilledCol)
assert.True(t, cols.Backfilled())
}
{
// Make sure the string precision gets copied over
columnWithStringPrecision := columns.NewColumn("foo", typing.String)
columnWithStringPrecision.KindDetails.OptionalStringPrecision = typing.ToPtr(int32(5))
col := mergeColumn(columns.NewColumn("foo", typing.String), columnWithStringPrecision)
assert.Equal(t, int32(5), *col.KindDetails.OptionalStringPrecision)
}
{
// Integer kind gets copied over
intCol := columns.NewColumn("foo", typing.Integer)
intCol.KindDetails.OptionalIntegerKind = typing.ToPtr(typing.SmallIntegerKind)
col := mergeColumn(columns.NewColumn("foo", typing.String), intCol)
assert.Equal(t, typing.SmallIntegerKind, *col.KindDetails.OptionalIntegerKind)
}
{
// Decimal details get copied over
decimalCol := columns.NewColumn("foo", typing.EDecimal)
details := decimal.NewDetails(5, 2)
decimalCol.KindDetails.ExtendedDecimalDetails = &details

col := mergeColumn(columns.NewColumn("foo", typing.String), decimalCol)
assert.Equal(t, details, *col.KindDetails.ExtendedDecimalDetails)
}
{
// Time details get copied over
{
// Testing for backwards compatibility
// in-memory column is TimestampNTZ, destination column is TimestampTZ
timestampNTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))
timestampTZColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))
col := mergeColumn(timestampNTZColumn, timestampTZColumn)
assert.Equal(t, ext.TimestampTZKindType, col.KindDetails.ExtendedTimeDetails.Type)
assert.Equal(t, "2006-01-02T15:04:05.999999999Z07:00", col.KindDetails.ExtendedTimeDetails.Format)
}
{
// Copy the dest column format if in-mem column format is empty.
inMemoryColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))
// Clearing the format
inMemoryColumn.KindDetails.ExtendedTimeDetails.Format = ""
destinationColumn := columns.NewColumn("foo", typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""))
assert.Equal(t, destinationColumn.KindDetails.ExtendedTimeDetails.Format, mergeColumn(inMemoryColumn, destinationColumn).KindDetails.ExtendedTimeDetails.Format)
}
}
}
17 changes: 1 addition & 16 deletions lib/typing/ext/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func NewNestedKind(kindType ExtendedTimeKindType, optionalFormat string) (Nested
// ExtendedTime is created because Golang's time.Time does not allow us to explicitly cast values as a date, or time
// and only allows timestamp expressions.
type ExtendedTime struct {
ts time.Time
nestedKind NestedKind
ts time.Time
}

// MarshalJSON is a custom JSON marshaller for ExtendedTime.
Expand All @@ -58,26 +57,12 @@ func (e ExtendedTime) MarshalJSON() ([]byte, error) {
return e.ts.UTC().MarshalJSON()
}

// TODO: Have this return an error instead of nil
func NewExtendedTime(t time.Time, kindType ExtendedTimeKindType, originalFormat string) *ExtendedTime {
defaultLayout, err := kindType.defaultLayout()
if err != nil {
return nil
}

return &ExtendedTime{
ts: t,
nestedKind: NestedKind{
Type: kindType,
Format: cmp.Or(originalFormat, defaultLayout),
},
}
}

func (e *ExtendedTime) GetTime() time.Time {
return e.ts
}

func (e *ExtendedTime) GetNestedKind() NestedKind {
return e.nestedKind
}
4 changes: 3 additions & 1 deletion lib/typing/ext/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ const TimezoneOffsetFormat = "Z07:00"

// RFC3339 variants
const (
RFC3339NoTZ = "2006-01-02T15:04:05.999999999"
RFC3339NoTZ = "2006-01-02T15:04:05.999999999"
RFC3339 = "2006-01-02T15:04:05.999999999" + TimezoneOffsetFormat

RFC3339MillisecondUTC = "2006-01-02T15:04:05.000Z"
RFC3339MicrosecondUTC = "2006-01-02T15:04:05.000000Z"
RFC3339NanosecondUTC = "2006-01-02T15:04:05.000000000Z"
Expand Down
6 changes: 2 additions & 4 deletions lib/typing/mongo/bson.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/artie-labs/transfer/lib/typing/ext"
)

// JSONEToMap - Takes a JSON Extended string and converts it to a map[string]any
Expand Down Expand Up @@ -89,9 +87,9 @@ func bsonBinaryValueToMap(value primitive.Binary) (any, error) {
func bsonValueToGoValue(value any) (any, error) {
switch v := value.(type) {
case primitive.DateTime:
return ext.NewExtendedTime(v.Time().UTC(), ext.TimestampTZKindType, ext.ISO8601), nil
return v.Time().UTC(), nil
case primitive.Timestamp:
return ext.NewExtendedTime(time.Unix(int64(v.T), 0).UTC(), ext.TimestampTZKindType, ext.ISO8601), nil
return time.Unix(int64(v.T), 0).UTC(), nil
case primitive.ObjectID:
return v.Hex(), nil
case primitive.Binary:
Expand Down
18 changes: 7 additions & 11 deletions lib/typing/mongo/bson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,13 @@ func TestJSONEToMap(t *testing.T) {
}
{
// Date
extendedTime, isOk := result["order_date"].(*ext.ExtendedTime)
assert.True(t, isOk)
assert.Equal(t, ext.NewExtendedTime(time.Date(2016, time.February, 21, 0, 0, 0, 0, time.UTC), ext.TimestampTZKindType, ext.ISO8601), extendedTime)
assert.Equal(t, time.Date(2016, time.February, 21, 0, 0, 0, 0, time.UTC), result["order_date"].(time.Time))
}
{
// Timestamp
extendedTime, isOk := result["test_timestamp"]
assert.True(t, isOk)
assert.Equal(t, ext.NewExtendedTime(time.Date(2023, time.March, 16, 1, 18, 37, 0, time.UTC), ext.TimestampTZKindType, ext.ISO8601), extendedTime)
assert.Equal(t, "2023-03-16T01:18:37+00:00", extendedTime.(*ext.ExtendedTime).GetTime().Format(ext.ISO8601))
_ts := result["test_timestamp"].(time.Time)
assert.Equal(t, time.Date(2023, time.March, 16, 1, 18, 37, 0, time.UTC), _ts)
assert.Equal(t, "2023-03-16T01:18:37+00:00", _ts.Format(ext.ISO8601))
}
{
// Boolean
Expand Down Expand Up @@ -220,10 +217,9 @@ func TestBsonValueToGoValue(t *testing.T) {
result, err := bsonValueToGoValue(dateTime)
assert.NoError(t, err)

extendedTime, isOk := result.(*ext.ExtendedTime)
assert.True(t, isOk)
assert.Equal(t, ext.NewExtendedTime(time.Date(2021, time.January, 1, 0, 0, 0, 0, time.UTC), ext.TimestampTZKindType, ext.ISO8601), extendedTime)
assert.Equal(t, "2021-01-01T00:00:00+00:00", extendedTime.GetTime().Format(ext.ISO8601))
_ts := result.(time.Time)
assert.Equal(t, time.Date(2021, time.January, 1, 0, 0, 0, 0, time.UTC), _ts)
assert.Equal(t, "2021-01-01T00:00:00+00:00", _ts.Format(ext.ISO8601))
}
{
// primitive.ObjectID
Expand Down
11 changes: 5 additions & 6 deletions lib/typing/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package typing
import (
"fmt"
"reflect"
"time"

"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
Expand Down Expand Up @@ -48,12 +49,10 @@ func ParseValue(key string, optionalSchema map[string]KindDetails, val any) (Kin
Kind: EDecimal.Kind,
ExtendedDecimalDetails: &extendedDetails,
}, nil
case *ext.ExtendedTime:
nestedKind := convertedVal.GetNestedKind()
return KindDetails{
Kind: ETime.Kind,
ExtendedTimeDetails: &nestedKind,
}, nil
case time.Time:
// time.Time should only appear for bson.Timestamp and bson.DateTime
// All other sources will have an optional schema
return NewExtendedTimeDetails(ETime, ext.TimestampTZKindType, "")
default:
// Check if the val is one of our custom-types
if reflect.TypeOf(val).Kind() == reflect.Slice {
Expand Down