Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BigQuery] Supporting TIMESTAMP_NTZ #985

Merged
merged 15 commits into from
Oct 24, 2024
46 changes: 40 additions & 6 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchem
fieldType = storagepb.TableFieldSchema_DATE
case ext.TimestampTZKindType:
fieldType = storagepb.TableFieldSchema_TIMESTAMP
case ext.TimestampNTZKindType:
fieldType = storagepb.TableFieldSchema_DATETIME
default:
return nil, fmt.Errorf("unsupported extended time details type: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down Expand Up @@ -89,15 +91,45 @@ func columnsToMessageDescriptor(cols []columns.Column) (*protoreflect.MessageDes
return &messageDescriptor, nil
}

const (
microLength = 20
secondShift = 0
minuteShift = 6
hourShift = 12
dayShift = 17
monthShift = 22
yearShift = 26
)

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L143
// See https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.CivilTimeEncoder
// And https://cloud.google.com/pubsub/docs/bigquery#date_time_int
func encodePacked64TimeMicros(value time.Time) int64 {
var result = int64(value.Nanosecond() / 1000)
result |= int64(value.Second()) << 20
result |= int64(value.Minute()) << 26
result |= int64(value.Hour()) << 32
return result
return int64(encodePacked32TimeSeconds(value))<<microLength | int64(value.Nanosecond()/1000)
}

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L92
func encodePacked32TimeSeconds(t time.Time) int32 {
var bitFieldTimeSeconds int32
bitFieldTimeSeconds |= int32(t.Hour()) << hourShift
bitFieldTimeSeconds |= int32(t.Minute()) << minuteShift
bitFieldTimeSeconds |= int32(t.Second()) << secondShift
return bitFieldTimeSeconds
}

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L187
func encodePacked64DatetimeSeconds(dateTime time.Time) int64 {
var bitFieldDatetimeSeconds int64
bitFieldDatetimeSeconds |= int64(dateTime.Year() << yearShift)
bitFieldDatetimeSeconds |= int64(dateTime.Month() << monthShift)
bitFieldDatetimeSeconds |= int64(dateTime.Day() << dayShift)
bitFieldDatetimeSeconds |= int64(encodePacked32TimeSeconds(dateTime.UTC()))
return bitFieldDatetimeSeconds
}

// This is a reimplementation of https://github.com/googleapis/java-bigquerystorage/blob/f79acb5cfdd12253bca1c41551c478400120d2f9/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java#L248
func encodePacked64DatetimeMicros(dateTime time.Time) int64 {
return encodePacked64DatetimeSeconds(dateTime)<<microLength | int64(dateTime.Nanosecond()/1000)
}

func rowToMessage(row map[string]any, columns []columns.Column, messageDescriptor protoreflect.MessageDescriptor) (*dynamicpb.Message, error) {
Expand Down Expand Up @@ -186,10 +218,12 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
daysSinceEpoch := _time.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case ext.TimestampTZKindType:
if err := timestamppb.New(_time).CheckValid(); err != nil {
if err = timestamppb.New(_time).CheckValid(); err != nil {
return nil, err
}
message.Set(field, protoreflect.ValueOfInt64(_time.UnixMicro()))
case ext.TimestampNTZKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64DatetimeMicros(_time)))
default:
return nil, fmt.Errorf("unsupported extended time details: %q", column.KindDetails.ExtendedTimeDetails.Type)
}
Expand Down
52 changes: 52 additions & 0 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,58 @@ func TestEncodePacked64TimeMicros(t *testing.T) {
assert.Equal(t, int64(1<<32+1000), encodePacked64TimeMicros(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Millisecond)))
}

func TestEncodePacked32TimeSeconds(t *testing.T) {
epoch := time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC)

assert.Equal(t, int32(0), encodePacked32TimeSeconds(epoch))
assert.Equal(t, int32(1), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Second)))
assert.Equal(t, int32(1<<6), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Minute)))
assert.Equal(t, int32(1<<12), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Hour)))
assert.Equal(t, int32(1<<12+1), encodePacked32TimeSeconds(epoch.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Second)))
}

func TestEncodePacked64DatetimeSeconds(t *testing.T) {
ts := time.Date(2024, 10, 24, 13, 1, 2, 3000000, time.UTC)
expected := 2024<<26 + 10<<22 + 24<<17 + int64(encodePacked32TimeSeconds(ts))

// Time
assert.Equal(t, expected, encodePacked64DatetimeSeconds(ts))
assert.Equal(t, expected+1<<0, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Second)))
assert.Equal(t, expected+1<<6, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Minute)))
assert.Equal(t, expected+1<<12, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Hour)))
assert.Equal(t, expected+1<<12+1<<0, encodePacked64DatetimeSeconds(ts.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Second)))

// Day
assert.Equal(t, expected+1<<17, encodePacked64DatetimeSeconds(ts.Add(time.Duration(24)*time.Hour)))
// Month
assert.Equal(t, expected+1<<22, encodePacked64DatetimeSeconds(ts.AddDate(0, 1, 0)))
// Year
assert.Equal(t, expected+1<<26, encodePacked64DatetimeSeconds(ts.AddDate(1, 0, 0)))
// Month and year
assert.Equal(t, expected+1<<22+1<<26, encodePacked64DatetimeSeconds(ts.AddDate(1, 1, 0)))
}

func TestEncodePacked64DatetimeMicros(t *testing.T) {
ts := time.Date(2024, 10, 24, 13, 1, 2, 123456789, time.UTC)
expected := encodePacked64DatetimeSeconds(ts)<<20 | int64(ts.Nanosecond()/1000)

// Time
assert.Equal(t, expected, encodePacked64DatetimeMicros(ts))
assert.Equal(t, expected+1<<(0+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Second)))
assert.Equal(t, expected+1<<(6+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Minute)))
assert.Equal(t, expected+1<<(12+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Hour)))
assert.Equal(t, expected+1<<(12+20)+1<<(0+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(1)*time.Hour+time.Duration(1)*time.Second)))

// Day
assert.Equal(t, expected+1<<(17+20), encodePacked64DatetimeMicros(ts.Add(time.Duration(24)*time.Hour)))
// Month
assert.Equal(t, expected+1<<(22+20), encodePacked64DatetimeMicros(ts.AddDate(0, 1, 0)))
// Year
assert.Equal(t, expected+1<<(26+20), encodePacked64DatetimeMicros(ts.AddDate(1, 0, 0)))
// Month and year
assert.Equal(t, expected+1<<(26+20)+1<<(22+20), encodePacked64DatetimeMicros(ts.AddDate(1, 1, 0)))
}

func TestRowToMessage(t *testing.T) {
columns := []columns.Column{
columns.NewColumn("c_bool", typing.Boolean),
Expand Down