diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index bce9ff214..207d74719 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -68,7 +68,11 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) + if s.config.BigQuery.UseStorageWriteAPI { + return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) + } else { + return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) + } } func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []string) ([]*Row, error) { @@ -158,6 +162,10 @@ func (s *Store) putTableViaLegacyAPI(ctx context.Context, tableID TableIdentifie return nil } +func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableIdentifier, tableData *optimization.TableData) error { + panic("not implemented") +} + func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index 7714f0876..cd0f535e2 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -65,28 +65,14 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) return extTime.String(dialect.BQStreamingTimeFormat), nil } case typing.Struct.Kind: - // TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164 - if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - - // Structs from relational and Mongo are different. - // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` - // Relational will return a string representation of the struct such as `{"hello": "world"}` - if colValString, isOk := colVal.(string); isOk { - if colValString == "" { - return nil, nil - } - - return colValString, nil - } - - colValBytes, err := json.Marshal(colVal) + stringValue, err := EncodeStructToJSONString(colVal) if err != nil { - return nil, fmt.Errorf("failed to marshal colVal: %w", err) + return nil, err + } else if stringValue == "" { + return nil, nil + } else { + return stringValue, nil } - - return string(colValBytes), nil case typing.Array.Kind: arrayString, err := array.InterfaceToArrayString(colVal, true) if err != nil { @@ -104,3 +90,23 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal)) return fmt.Sprint(colVal), nil } + +// EncodeStructToJSONString takes a struct as either a string or Go object and encodes it into a JSON string. +// Structs from relational and Mongo are different. +// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` +// Relational will return a string representation of the struct such as `{"hello": "world"}` +func EncodeStructToJSONString(value any) (string, error) { + if colValString, isOk := value.(string); isOk { + if strings.Contains(colValString, constants.ToastUnavailableValuePlaceholder) { + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + return colValString, nil + } + + colValBytes, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("failed to marshal colVal: %w", err) + } + + return string(colValBytes), nil +} diff --git a/go.mod b/go.mod index 16b172e5a..54da614d0 100644 --- a/go.mod +++ b/go.mod @@ -140,6 +140,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/grpc v1.63.2 // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 3eef7ef73..1de7b64d5 100644 --- a/go.sum +++ b/go.sum @@ -810,8 +810,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index 6e8250daa..c1d5c123f 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -5,11 +5,12 @@ import "fmt" type BigQuery struct { // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var // Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC - PathToCredentials string `yaml:"pathToCredentials"` - DefaultDataset string `yaml:"defaultDataset"` - ProjectID string `yaml:"projectID"` - Location string `yaml:"location"` - BatchSize int `yaml:"batchSize"` + PathToCredentials string `yaml:"pathToCredentials"` + DefaultDataset string `yaml:"defaultDataset"` + ProjectID string `yaml:"projectID"` + Location string `yaml:"location"` + BatchSize int `yaml:"batchSize"` + UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet. } func (b *BigQuery) LoadDefaultValues() {