Skip to content

Commit

Permalink
[bigquery] Add scaffolding for the Storage Write API
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 13, 2024
1 parent 2539f85 commit a28c02d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 29 deletions.
10 changes: 9 additions & 1 deletion clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}

// Load the data
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
if s.config.BigQuery.UseStorageWriteAPI {
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
} else {
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
}
}

func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) {
Expand Down Expand Up @@ -158,6 +162,10 @@ func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifie
return nil
}

func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error {
panic("not implemented")
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

Expand Down
46 changes: 26 additions & 20 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,14 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)
return extTime.String(dialect.BQStreamingTimeFormat), nil
}
case typing.Struct.Kind:
// TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}

// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
if colValString, isOk := colVal.(string); isOk {
if colValString == "" {
return nil, nil
}

return colValString, nil
}

colValBytes, err := json.Marshal(colVal)
stringValue, err := EncodeStructToJSONString(colVal)
if err != nil {
return nil, fmt.Errorf("failed to marshal colVal: %w", err)
return nil, err
} else if stringValue == "" {
return nil, nil
} else {
return stringValue, nil
}

return string(colValBytes), nil
case typing.Array.Kind:
arrayString, err := array.InterfaceToArrayString(colVal, true)
if err != nil {
Expand All @@ -104,3 +90,23 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)
slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal))
return fmt.Sprint(colVal), nil
}

// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string.
// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
func EncodeStructToJSONString(value any) (string, error) {
if colValString, isOk := value.(string); isOk {
if strings.Contains(colValString, constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}
return colValString, nil
}

colValBytes, err := json.Marshal(value)
if err != nil {
return "", fmt.Errorf("failed to marshal colVal: %w", err)
}

return string(colValBytes), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
11 changes: 6 additions & 5 deletions lib/config/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import "fmt"
type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
// Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet.
}

func (b *BigQuery) LoadDefaultValues() {
Expand Down

0 comments on commit a28c02d

Please sign in to comment.