diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 5de18e30f..f2ba0182d 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -5,7 +5,6 @@ import ( "fmt" "log/slog" "os" - "strings" "cloud.google.com/go/bigquery" _ "github.com/viant/bigquery" @@ -76,7 +75,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo } // Load the data - return s.putTable(context.Background(), tableData.TopicConfig().Database, tempTableID, rows) + return s.putTable(context.Background(), tempTableID, rows) } func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) types.TableIdentifier { @@ -124,52 +123,19 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client { return client } -func tableRelName(fqName string) (string, error) { - fqNameParts := strings.Split(fqName, ".") - if len(fqNameParts) < 3 { - return "", fmt.Errorf("invalid fully qualified name: %s", fqName) - } - - return strings.Join(fqNameParts[2:], "."), nil -} - -func (s *Store) putTable(ctx context.Context, dataset string, tableID types.TableIdentifier, rows []*Row) error { +func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, rows []*Row) error { bqTableID, ok := tableID.(TableIdentifier) if !ok { return fmt.Errorf("unable to cast tableID to BigQuery TableIdentifier") } - tableName := tableID.FullyQualifiedName() - relTableName, err := tableRelName(tableName) - if err != nil { - return fmt.Errorf("failed to get table name: %w", err) - } - - if dataset != bqTableID.Dataset() { - // TODO: [tableID] has [Dataset] on it, don't need to pass it along. - slog.Error("BigQuery dataset is different", - slog.String("dataset", dataset), - slog.String("bqTableID.Dataset", bqTableID.Dataset()), - slog.String("fqn", tableName), - ) - } - - if relTableName != bqTableID.Table() { - // TODO: Use [bqTableID.Table] instead of [relTableName]. - slog.Error("BigQuery table name is different", - slog.String("relTableName", relTableName), - slog.String("bqTableID.Table", bqTableID.Table()), - slog.String("fqn", tableName), - ) - } - client := s.GetClient(ctx) defer client.Close() batch := NewBatch(rows, s.batchSize) - inserter := client.Dataset(dataset).Table(relTableName).Inserter() + inserter := client.Dataset(bqTableID.Dataset()).Table(bqTableID.Table()).Inserter() for batch.HasNext() { - if err = inserter.Put(ctx, batch.NextChunk()); err != nil { + if err := inserter.Put(ctx, batch.NextChunk()); err != nil { return fmt.Errorf("failed to insert rows: %w", err) } } diff --git a/clients/bigquery/bigquery_test.go b/clients/bigquery/bigquery_test.go index c6b6d25c9..2a6fca484 100644 --- a/clients/bigquery/bigquery_test.go +++ b/clients/bigquery/bigquery_test.go @@ -13,27 +13,6 @@ import ( "github.com/stretchr/testify/assert" ) -func (b *BigQueryTestSuite) TestTableRelName() { - { - relName, err := tableRelName("project.dataset.table") - assert.NoError(b.T(), err) - assert.Equal(b.T(), "table", relName) - } - { - relName, err := tableRelName("project.dataset.table.table") - assert.NoError(b.T(), err) - assert.Equal(b.T(), "table.table", relName) - } - { - // All the possible errors - _, err := tableRelName("project.dataset") - assert.ErrorContains(b.T(), err, "invalid fully qualified name: project.dataset") - - _, err = tableRelName("project") - assert.ErrorContains(b.T(), err, "invalid fully qualified name: project") - } -} - func TestTempTableName(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_")