Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bigquery] Add scaffolding for the Storage Write API #720

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}

// Load the data
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
if s.config.BigQuery.UseStorageWriteAPI {
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
} else {
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
}
}

func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) {
Expand Down Expand Up @@ -158,6 +162,10 @@ func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifie
return nil
}

func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error {
panic("not implemented")
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

Expand Down
50 changes: 30 additions & 20 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,14 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)
return extTime.String(dialect.BQStreamingTimeFormat), nil
}
case typing.Struct.Kind:
// TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}

// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
if colValString, isOk := colVal.(string); isOk {
if colValString == "" {
return nil, nil
}

return colValString, nil
}

colValBytes, err := json.Marshal(colVal)
stringValue, err := EncodeStructToJSONString(colVal)
if err != nil {
return nil, fmt.Errorf("failed to marshal colVal: %w", err)
return nil, err
} else if stringValue == "" {
return nil, nil
} else {
return stringValue, nil
}

return string(colValBytes), nil
case typing.Array.Kind:
arrayString, err := array.InterfaceToArrayString(colVal, true)
if err != nil {
Expand All @@ -104,3 +90,27 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)
slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal))
return fmt.Sprint(colVal), nil
}

// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string.
// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
func EncodeStructToJSONString(value any) (string, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need this for the Storage Write API.

if stringValue, isOk := value.(string); isOk {
if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}
return stringValue, nil
}

bytes, err := json.Marshal(value)
if err != nil {
return "", fmt.Errorf("failed to marshal colVal: %w", err)
}

stringValue := string(bytes)
if strings.Contains(stringValue, constants.ToastUnavailableValuePlaceholder) {
slog.Error("encoded JSON value contains the toast unavailable value placeholder")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return this too

return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil

}
return stringValue, nil
}
22 changes: 22 additions & 0 deletions clients/bigquery/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bigquery
import (
"fmt"
"math/big"
"testing"
"time"

"github.com/artie-labs/transfer/lib/ptr"
Expand Down Expand Up @@ -160,3 +161,24 @@ func (b *BigQueryTestSuite) TestCastColVal() {
assert.Nil(b.T(), colVal)
}
}

func TestEncodeStructToJSONString(t *testing.T) {
{
// Empty string:
result, err := EncodeStructToJSONString("")
assert.NoError(t, err)
assert.Equal(t, "", result)
}
{
// Toasted string:
result, err := EncodeStructToJSONString("__debezium_unavailable_value")
assert.NoError(t, err)
assert.Equal(t, `{"key":"__debezium_unavailable_value"}`, result)
}
{
// Map:
result, err := EncodeStructToJSONString(map[string]any{"foo": "bar", "baz": 1234})
assert.NoError(t, err)
assert.Equal(t, `{"baz":1234,"foo":"bar"}`, result)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
11 changes: 6 additions & 5 deletions lib/config/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import "fmt"
type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
// Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet.
Copy link
Contributor Author

@nathan-artie nathan-artie Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For config settings that are not intended for production use I think it's nice to prefix them with "__" to indicate that the are not officially supported, that way we can get rid of them without breaking backwards compatibility and having to bump the minor version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}

func (b *BigQuery) LoadDefaultValues() {
Expand Down