Skip to content

Commit

Permalink
[BigQuery] Supporting TIMESTAMP_NTZ (#985)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 24, 2024
1 parent 17bc676 commit 0ca75c0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
46 changes: 40 additions & 6 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem
fieldType = storagepb.TableFieldSchema_DATE
case ext.TimestampTZKindType:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
case ext.TimestampNTZKindType:
fieldType = storagepb.TableFieldSchema_DATETIME
default:
return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down Expand Up @@ -89,15 +91,45 @@ func columnsToMessageDescriptor(cols []columns.Column) (*protoreflect.MessageDes
return &messageDescriptor, nil
}

const (
microLength = 20
secondShift = 0
minuteShift = 6
hourShift = 12
dayShift = 17
monthShift = 22
yearShift = 26
)

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L143
// See https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.CivilTimeEncoder
// And https://cloud.google.com/pubsub/docs/bigquery#date_time_int
func encodePacked64TimeMicros(value time.Time) int64 {
var result = int64(value.Nanosecond() / 1000)
result |= int64(value.Second()) << 20
result |= int64(value.Minute()) << 26
result |= int64(value.Hour()) << 32
return result
return int64(encodePacked32TimeSeconds(value))<<microLength | int64(value.Nanosecond()/1000)
}

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L92
func encodePacked32TimeSeconds(t time.Time) int32 {
var bitFieldTimeSeconds int32
bitFieldTimeSeconds |= int32(t.Hour()) << hourShift
bitFieldTimeSeconds |= int32(t.Minute()) << minuteShift
bitFieldTimeSeconds |= int32(t.Second()) << secondShift
return bitFieldTimeSeconds
}

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L187
func encodePacked64DatetimeSeconds(dateTime time.Time) int64 {
var bitFieldDatetimeSeconds int64
bitFieldDatetimeSeconds |= int64(dateTime.Year() << yearShift)
bitFieldDatetimeSeconds |= int64(dateTime.Month() << monthShift)
bitFieldDatetimeSeconds |= int64(dateTime.Day() << dayShift)
bitFieldDatetimeSeconds |= int64(encodePacked32TimeSeconds(dateTime.UTC()))
return bitFieldDatetimeSeconds
}

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L248
func encodePacked64DatetimeMicros(dateTime time.Time) int64 {
return encodePacked64DatetimeSeconds(dateTime)<<microLength | int64(dateTime.Nanosecond()/1000)
}

func rowToMessage(row map[string]any, columns []columns.Column, messageDescriptor protoreflect.MessageDescriptor) (*dynamicpb.Message, error) {
Expand Down Expand Up @@ -186,10 +218,12 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
daysSinceEpoch := _time.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case ext.TimestampTZKindType:
if err := timestamppb.New(_time).CheckValid(); err != nil {
if err = timestamppb.New(_time).CheckValid(); err != nil {
return nil, err
}
message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro()))
case ext.TimestampNTZKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
default:
return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down
52 changes: 52 additions & 0 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,58 @@ func TestEncodePacked64TimeMicros(t *testing.T) {
assert.Equal(t, int64(1<<32+1000), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Millisecond)))
}

func TestEncodePacked32TimeSeconds(t *testing.T) {
epoch := time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC)

assert.Equal(t, int32(0), encodePacked32TimeSeconds(epoch))
assert.Equal(t, int32(1), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Second)))
assert.Equal(t, int32(1<<6), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Minute)))
assert.Equal(t, int32(1<<12), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Hour)))
assert.Equal(t, int32(1<<12+1), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Second)))
}

func TestEncodePacked64DatetimeSeconds(t *testing.T) {
ts := time.Date(2024, 10, 24, 13, 1, 2, 3000000, time.UTC)
expected := 2024<<26 + 10<<22 + 24<<17 + int64(encodePacked32TimeSeconds(ts))

// Time
assert.Equal(t, expected, encodePacked64DatetimeSeconds(ts))
assert.Equal(t, expected+1<<0, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Second)))
assert.Equal(t, expected+1<<6, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Minute)))
assert.Equal(t, expected+1<<12, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Hour)))
assert.Equal(t, expected+1<<12+1<<0, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Second)))

// Day
assert.Equal(t, expected+1<<17, encodePacked64DatetimeSeconds(ts.Add(time.Duration(24)*time.Hour)))
// Month
assert.Equal(t, expected+1<<22, encodePacked64DatetimeSeconds(ts.AddDate(0, 1, 0)))
// Year
assert.Equal(t, expected+1<<26, encodePacked64DatetimeSeconds(ts.AddDate(1, 0, 0)))
// Month and year
assert.Equal(t, expected+1<<22+1<<26, encodePacked64DatetimeSeconds(ts.AddDate(1, 1, 0)))
}

func TestEncodePacked64DatetimeMicros(t *testing.T) {
ts := time.Date(2024, 10, 24, 13, 1, 2, 123456789, time.UTC)
expected := encodePacked64DatetimeSeconds(ts)<<20 | int64(ts.Nanosecond()/1000)

// Time
assert.Equal(t, expected, encodePacked64DatetimeMicros(ts))
assert.Equal(t, expected+1<<(0+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Second)))
assert.Equal(t, expected+1<<(6+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Minute)))
assert.Equal(t, expected+1<<(12+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Hour)))
assert.Equal(t, expected+1<<(12+20)+1<<(0+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Second)))

// Day
assert.Equal(t, expected+1<<(17+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(24)*time.Hour)))
// Month
assert.Equal(t, expected+1<<(22+20), encodePacked64DatetimeMicros(ts.AddDate(0, 1, 0)))
// Year
assert.Equal(t, expected+1<<(26+20), encodePacked64DatetimeMicros(ts.AddDate(1, 0, 0)))
// Month and year
assert.Equal(t, expected+1<<(26+20)+1<<(22+20), encodePacked64DatetimeMicros(ts.AddDate(1, 1, 0)))
}

func TestRowToMessage(t *testing.T) {
columns := []columns.Column{
columns.NewColumn("c_bool", typing.Boolean),
Expand Down

0 comments on commit 0ca75c0

Please sign in to comment.