Skip to content

Commit

Permalink
[BigQuery] Removing Legacy API (#729)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 17, 2024
1 parent a320314 commit df9dd9b
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 286 deletions.
49 changes: 1 addition & 48 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
useStorageWriteAPI = true
)

type Store struct {
Expand Down Expand Up @@ -72,33 +71,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}

// Load the data
if useStorageWriteAPI {
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
} else {
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
}
}

func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) {
// Cast the data into BigQuery values
var rows []*Row
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
for _, value := range tableData.Rows() {
data := make(map[string]bigquery.Value)
for _, col := range columns {
colVal, err := castColVal(value[col.Name()], col, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err)
}

if colVal != nil {
data[col.Name()] = colVal
}
}

rows = append(rows, NewRow(data))
}
return rows, nil
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
}

func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
Expand Down Expand Up @@ -146,26 +119,6 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
return client
}

func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error {
rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats)
if err != nil {
return err
}

client := s.GetClient(ctx)
defer client.Close()

batch := NewBatch(rows, s.batchSize)
inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter()
for batch.HasNext() {
if err := inserter.Put(ctx, batch.NextChunk()); err != nil {
return fmt.Errorf("failed to insert rows: %w", err)
}
}

return nil
}

func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error {
columns := tableData.ReadOnlyInMemoryCols().ValidColumns()

Expand Down
82 changes: 0 additions & 82 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,91 +6,9 @@ import (
"log/slog"
"strings"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
)

func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) (any, error) {
if colVal == nil {
return nil, nil
}

switch colKind.KindDetails.Kind {
case typing.String.Kind:
if val, isOk := colVal.(*decimal.Decimal); isOk {
return val.String(), nil
}

return colVal, nil
case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind:
return colVal, nil
case typing.EDecimal.Kind:
val, isOk := colVal.(*decimal.Decimal)
if !isOk {
return nil, fmt.Errorf("colVal is not type *decimal.Decimal")
}

return val.Value(), nil
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err)
}

if colKind.KindDetails.ExtendedTimeDetails == nil {
return nil, fmt.Errorf("column kind details for extended time details is null")
}

// We should be using the colKind here since the data types coming from the source may be inconsistent.
switch colKind.KindDetails.ExtendedTimeDetails.Type {
// https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery#sending_datetime_data
case ext.DateTimeKindType:
if extTime.Year() == 0 {
return nil, nil
}

return extTime.StringUTC(ext.BigQueryDateTimeFormat), nil
case ext.DateKindType:
if extTime.Year() == 0 {
return nil, nil
}

return extTime.String(ext.PostgresDateFormat), nil
case ext.TimeKindType:
return extTime.String(dialect.BQStreamingTimeFormat), nil
}
case typing.Struct.Kind:
stringValue, err := EncodeStructToJSONString(colVal)
if err != nil {
return nil, err
} else if stringValue == "" {
return nil, nil
} else {
return stringValue, nil
}
case typing.Array.Kind:
arrayString, err := array.InterfaceToArrayString(colVal, true)
if err != nil {
return nil, err
}

if len(arrayString) == 0 {
return nil, nil
}

return arrayString, nil
}

// TODO: Change this to return an error once we don't see Sentry
slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal))
return fmt.Sprint(colVal), nil
}

// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string.
// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
Expand Down
156 changes: 0 additions & 156 deletions clients/bigquery/cast_test.go
Original file line number Diff line number Diff line change
@@ -1,167 +1,11 @@
package bigquery

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/decimal"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"
)

func (b *BigQueryTestSuite) TestCastColVal() {
{
// Strings
colVal, err := castColVal("hello", columns.Column{KindDetails: typing.String}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "hello", colVal)

// Decimal
dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45))
colVal, err = castColVal(dec, columns.Column{KindDetails: typing.String}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "123.45", colVal)
}
{
// Integers
colVal, err := castColVal(5, columns.Column{KindDetails: typing.Integer}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), 5, colVal)
}
{
// Floats
colVal, err := castColVal(5.55, columns.Column{KindDetails: typing.Float}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), 5.55, colVal)
}
{
// Booleans
colVal, err := castColVal(true, columns.Column{KindDetails: typing.Boolean}, nil)
assert.NoError(b.T(), err)
assert.True(b.T(), colVal.(bool))
}
{
// EDecimals
dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45))
colVal, err := castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil)
assert.NoError(b.T(), err)

// Native type is big.Float if precision doesn't exceed the DWH limit
assert.Equal(b.T(), big.NewFloat(123.45), colVal)

// Precision has clearly exceeded the limit, so we should be returning a string
dec = decimal.NewDecimal(ptr.ToInt(50), 2, big.NewFloat(123.45))
colVal, err = castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "123.45", colVal)
}
{
// ETime
birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC)

tsKind := typing.ETime
tsKind.ExtendedTimeDetails = &ext.DateTime

dateKind := typing.ETime
dateKind.ExtendedTimeDetails = &ext.Date
birthdayTSExt := ext.NewExtendedTime(birthday, tsKind.ExtendedTimeDetails.Type, "")
{
// Timestamp
colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: tsKind}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "2022-09-06 03:19:24.942", colVal)
}
{
// Date
birthdayDateExt := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "")
colVal, err := castColVal(birthdayDateExt, columns.Column{KindDetails: dateKind}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "2022-09-06", colVal)
}

{
// Date (column is a date, but value is not)
colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: dateKind}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "2022-09-06", colVal)
}
{
// Time
timeKind := typing.ETime
timeKind.ExtendedTimeDetails = &ext.Time
birthdayTimeExt := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "")
colVal, err := castColVal(birthdayTimeExt, columns.Column{KindDetails: timeKind}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), "03:19:24", colVal)
}

invalidDate := time.Date(0, time.September, 6, 3, 19, 24, 942000000, time.UTC)
invalidDateTsExt := ext.NewExtendedTime(invalidDate, tsKind.ExtendedTimeDetails.Type, "")
{
// Date (column is a date, but value is invalid)
colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: dateKind}, nil)
assert.NoError(b.T(), err)
assert.Nil(b.T(), colVal)
}
{
// Datetime (column is datetime but value is invalid)
colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: tsKind}, nil)
assert.NoError(b.T(), err)
assert.Nil(b.T(), colVal)
}
}
{
// Structs
colVal, err := castColVal(map[string]any{"hello": "world"}, columns.Column{KindDetails: typing.Struct}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), `{"hello":"world"}`, colVal)

// With string values
colVal, err = castColVal(`{"hello":"world"}`, columns.Column{KindDetails: typing.Struct}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), `{"hello":"world"}`, colVal)

// With empty string
colVal, err = castColVal("", columns.Column{KindDetails: typing.Struct}, nil)
assert.NoError(b.T(), err)
assert.Nil(b.T(), colVal)

// With array
colVal, err = castColVal([]any{map[string]any{}, map[string]any{"hello": "world"}}, columns.Column{KindDetails: typing.Struct}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), `[{},{"hello":"world"}]`, colVal)

// With TOAST values
colVal, err = castColVal(constants.ToastUnavailableValuePlaceholder, columns.Column{KindDetails: typing.Struct}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), colVal)
}
{
// Arrays
colVal, err := castColVal([]any{1, 2, 3, 4, 5}, columns.Column{KindDetails: typing.Array}, nil)
assert.NoError(b.T(), err)
assert.Equal(b.T(), []string{"1", "2", "3", "4", "5"}, colVal)

// Empty array
colVal, err = castColVal([]any{}, columns.Column{KindDetails: typing.Array}, nil)
assert.NoError(b.T(), err)
assert.Nil(b.T(), colVal)

// Null array
colVal, err = castColVal(nil, columns.Column{KindDetails: typing.Array}, nil)
assert.NoError(b.T(), err)
assert.Nil(b.T(), colVal)
}
}

func TestEncodeStructToJSONString(t *testing.T) {
{
// Empty string:
Expand Down

0 comments on commit df9dd9b

Please sign in to comment.