diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index c2863e374..3c80fca42 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -34,6 +34,7 @@ const ( describeNameCol = "column_name" describeTypeCol = "data_type" describeCommentCol = "description" + useStorageWriteAPI = true ) type Store struct { @@ -71,7 +72,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - if s.config.BigQuery.UseStorageWriteAPI { + if useStorageWriteAPI { return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData) } else { return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData) @@ -90,6 +91,7 @@ func buildLegacyRows(tableData *optimization.TableData, additionalDateFmts []str return nil, fmt.Errorf("failed to cast col %q: %w", col.Name(), err) } + fmt.Println("col.Name", col.Name(), "colKind", col.KindDetails, "colVal", colVal, fmt.Sprintf("colVal type: %T", colVal)) if colVal != nil { data[col.Name()] = colVal } diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index b81ee1990..81e027cdb 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -21,12 +21,17 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) } switch colKind.KindDetails.Kind { +<<<<<<< Updated upstream case typing.String.Kind: if val, isOk := colVal.(*decimal.Decimal); isOk { return val.String(), nil } return colVal, nil +======= + return fmt.Sprint(colVal), nil + +>>>>>>> Stashed changes case typing.Float.Kind, typing.Integer.Kind, typing.Boolean.Kind: return colVal, nil case typing.EDecimal.Kind: diff --git a/lib/cdc/relational/debezium.go b/lib/cdc/relational/debezium.go index 29731befa..1784da453 100644 --- a/lib/cdc/relational/debezium.go +++ b/lib/cdc/relational/debezium.go @@ -15,6 +15,8 @@ import ( type Debezium string func (d *Debezium) GetEventFromBytes(_ typing.Settings, bytes []byte) (cdc.Event, error) { + fmt.Println("string", string(bytes)) + var event util.SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index c1d5c123f..6e8250daa 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -5,12 +5,11 @@ import "fmt" type BigQuery struct { // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var // Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC - PathToCredentials string `yaml:"pathToCredentials"` - DefaultDataset string `yaml:"defaultDataset"` - ProjectID string `yaml:"projectID"` - Location string `yaml:"location"` - BatchSize int `yaml:"batchSize"` - UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet. + PathToCredentials string `yaml:"pathToCredentials"` + DefaultDataset string `yaml:"defaultDataset"` + ProjectID string `yaml:"projectID"` + Location string `yaml:"location"` + BatchSize int `yaml:"batchSize"` } func (b *BigQuery) LoadDefaultValues() { diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index 79d6515a0..6d4332fb1 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "log/slog" "maps" "sync" @@ -26,6 +27,12 @@ func NewDwhTableConfig(columns *columns.Columns, colsToDelete map[string]time.Ti colsToDelete = make(map[string]time.Time) } + if columns != nil { + for _, col := range columns.GetColumns() { + fmt.Println("colName", col.Name(), "colKind", col.KindDetails) + } + } + return &DwhTableConfig{ columns: columns, columnsToDelete: colsToDelete,