diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 78ea841e6..80bda585e 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -34,7 +34,6 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" - useStorageWriteAPI = true ) type Store struct { @@ -72,33 +71,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - if useStorageWriteAPI { - return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) - } else { - return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) - } -} - -func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) { - // Cast the data into BigQuery values - var rows []*Row - columns := tableData.ReadOnlyInMemoryCols().ValidColumns() - for _, value := range tableData.Rows() { - data := make(map[string]bigquery.Value) - for _, col := range columns { - colVal, err := castColVal(value[col.Name()], col, additionalDateFmts) - if err != nil { - return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err) - } - - if colVal != nil { - data[col.Name()] = colVal - } - } - - rows = append(rows, NewRow(data)) - } - return rows, nil + return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) } func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { @@ -146,26 +119,6 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client { return client } -func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifier, tableData *optimization.TableData) error { - rows, err := buildLegacyRows(tableData, s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats) - if err != nil { - return err - } - - client := s.GetClient(ctx) - defer client.Close() - - batch := NewBatch(rows, s.batchSize) - inserter := client.Dataset(tableID.Dataset()).Table(tableID.Table()).Inserter() - for batch.HasNext() { - if err := inserter.Put(ctx, batch.NextChunk()); err != nil { - return fmt.Errorf("failed to insert rows: %w", err) - } - } - - return nil -} - func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { columns := tableData.ReadOnlyInMemoryCols().ValidColumns() diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go deleted file mode 100644 index b81ee1990..000000000 --- a/clients/bigquery/cast.go +++ /dev/null @@ -1,118 +0,0 @@ -package bigquery - -import ( - "encoding/json" - "fmt" - "log/slog" - "strings" - - "github.com/artie-labs/transfer/clients/bigquery/dialect" - "github.com/artie-labs/transfer/lib/array" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/decimal" - "github.com/artie-labs/transfer/lib/typing/ext" -) - -func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) (any, error) { - if colVal == nil { - return nil, nil - } - - switch colKind.KindDetails.Kind { - case typing.String.Kind: - if val, isOk := colVal.(*decimal.Decimal); isOk { - return val.String(), nil - } - - return colVal, nil - case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind: - return colVal, nil - case typing.EDecimal.Kind: - val, isOk := colVal.(*decimal.Decimal) - if !isOk { - return nil, fmt.Errorf("colVal is not type *decimal.Decimal") - } - - return val.Value(), nil - case typing.ETime.Kind: - extTime, err := ext.ParseFromInterface(colVal, additionalDateFmts) - if err != nil { - return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err) - } - - if colKind.KindDetails.ExtendedTimeDetails == nil { - return nil, fmt.Errorf("column kind details for extended time details is null") - } - - // We should be using the colKind here since the data types coming from the source may be inconsistent. - switch colKind.KindDetails.ExtendedTimeDetails.Type { - // https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery#sending_datetime_data - case ext.DateTimeKindType: - if extTime.Year() == 0 { - return nil, nil - } - - return extTime.StringUTC(ext.BigQueryDateTimeFormat), nil - case ext.DateKindType: - if extTime.Year() == 0 { - return nil, nil - } - - return extTime.String(ext.PostgresDateFormat), nil - case ext.TimeKindType: - return extTime.String(dialect.BQStreamingTimeFormat), nil - } - case typing.Struct.Kind: - stringValue, err := EncodeStructToJSONString(colVal) - if err != nil { - return nil, err - } else if stringValue == "" { - return nil, nil - } else { - return stringValue, nil - } - case typing.Array.Kind: - arrayString, err := array.InterfaceToArrayString(colVal, true) - if err != nil { - return nil, err - } - - if len(arrayString) == 0 { - return nil, nil - } - - return arrayString, nil - } - - // TODO: Change this to return an error once we don't see Sentry - slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal)) - return fmt.Sprint(colVal), nil -} - -// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. -// Structs from relational and Mongo are different. -// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` -// Relational will return a string representation of the struct such as `{"hello": "world"}` -func EncodeStructToJSONString(value any) (string, error) { - if stringValue, isOk := value.(string); isOk { - if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - return stringValue, nil - } - - bytes, err := json.Marshal(value) - if err != nil { - return "", fmt.Errorf("failed to marshal value: %w", err) - } - - stringValue := string(bytes) - if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) { - // TODO: Remove this if we don't see it in the logs. - slog.Error("encoded JSON value contains the toast unavailable value placeholder") - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - return stringValue, nil -} diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go deleted file mode 100644 index 68d6222f2..000000000 --- a/clients/bigquery/cast_test.go +++ /dev/null @@ -1,190 +0,0 @@ -package bigquery - -import ( - "fmt" - "math/big" - "testing" - "time" - - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/decimal" - - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/stretchr/testify/assert" -) - -func (b *BigQueryTestSuite) TestCastColVal() { - { - // Strings - colVal, err := castColVal("hello", columns.Column{KindDetails: typing.String}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "hello", colVal) - - // Decimal - dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45)) - colVal, err = castColVal(dec, columns.Column{KindDetails: typing.String}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "123.45", colVal) - } - { - // Integers - colVal, err := castColVal(5, columns.Column{KindDetails: typing.Integer}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), 5, colVal) - } - { - // Floats - colVal, err := castColVal(5.55, columns.Column{KindDetails: typing.Float}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), 5.55, colVal) - } - { - // Booleans - colVal, err := castColVal(true, columns.Column{KindDetails: typing.Boolean}, nil) - assert.NoError(b.T(), err) - assert.True(b.T(), colVal.(bool)) - } - { - // EDecimals - dec := decimal.NewDecimal(ptr.ToInt(5), 2, big.NewFloat(123.45)) - colVal, err := castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil) - assert.NoError(b.T(), err) - - // Native type is big.Float if precision doesn't exceed the DWH limit - assert.Equal(b.T(), big.NewFloat(123.45), colVal) - - // Precision has clearly exceeded the limit, so we should be returning a string - dec = decimal.NewDecimal(ptr.ToInt(50), 2, big.NewFloat(123.45)) - colVal, err = castColVal(dec, columns.Column{KindDetails: typing.EDecimal}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "123.45", colVal) - } - { - // ETime - birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC) - - tsKind := typing.ETime - tsKind.ExtendedTimeDetails = &ext.DateTime - - dateKind := typing.ETime - dateKind.ExtendedTimeDetails = &ext.Date - birthdayTSExt := ext.NewExtendedTime(birthday, tsKind.ExtendedTimeDetails.Type, "") - { - // Timestamp - colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: tsKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06 03:19:24.942", colVal) - } - { - // Date - birthdayDateExt := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "") - colVal, err := castColVal(birthdayDateExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06", colVal) - } - - { - // Date (column is a date, but value is not) - colVal, err := castColVal(birthdayTSExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "2022-09-06", colVal) - } - { - // Time - timeKind := typing.ETime - timeKind.ExtendedTimeDetails = &ext.Time - birthdayTimeExt := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "") - colVal, err := castColVal(birthdayTimeExt, columns.Column{KindDetails: timeKind}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), "03:19:24", colVal) - } - - invalidDate := time.Date(0, time.September, 6, 3, 19, 24, 942000000, time.UTC) - invalidDateTsExt := ext.NewExtendedTime(invalidDate, tsKind.ExtendedTimeDetails.Type, "") - { - // Date (column is a date, but value is invalid) - colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: dateKind}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } - { - // Datetime (column is datetime but value is invalid) - colVal, err := castColVal(invalidDateTsExt, columns.Column{KindDetails: tsKind}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } - } - { - // Structs - colVal, err := castColVal(map[string]any{"hello": "world"}, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `{"hello":"world"}`, colVal) - - // With string values - colVal, err = castColVal(`{"hello":"world"}`, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `{"hello":"world"}`, colVal) - - // With empty string - colVal, err = castColVal("", columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - - // With array - colVal, err = castColVal([]any{map[string]any{}, map[string]any{"hello": "world"}}, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), `[{},{"hello":"world"}]`, colVal) - - // With TOAST values - colVal, err = castColVal(constants.ToastUnavailableValuePlaceholder, columns.Column{KindDetails: typing.Struct}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), colVal) - } - { - // Arrays - colVal, err := castColVal([]any{1, 2, 3, 4, 5}, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Equal(b.T(), []string{"1", "2", "3", "4", "5"}, colVal) - - // Empty array - colVal, err = castColVal([]any{}, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - - // Null array - colVal, err = castColVal(nil, columns.Column{KindDetails: typing.Array}, nil) - assert.NoError(b.T(), err) - assert.Nil(b.T(), colVal) - } -} - -func TestEncodeStructToJSONString(t *testing.T) { - { - // Empty string: - result, err := EncodeStructToJSONString("") - assert.NoError(t, err) - assert.Equal(t, "", result) - } - { - // Toasted string: - result, err := EncodeStructToJSONString("__debezium_unavailable_value") - assert.NoError(t, err) - assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) - } - { - // Map: - result, err := EncodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234}) - assert.NoError(t, err) - assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result) - } - { - // Toasted map (should not happen): - result, err := EncodeStructToJSONString(map[string]any{"__debezium_unavailable_value": "bar", "baz": 1234}) - assert.NoError(t, err) - assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result) - } -}