From 04ef1610c8eff3732fa9e3a819ee0856fcd096dd Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 18:17:22 -0700 Subject: [PATCH 1/5] [bigquery] Always escape table names --- clients/bigquery/bigquery_test.go | 6 +++--- clients/bigquery/merge_test.go | 12 ++++++------ clients/bigquery/tableid.go | 2 +- clients/bigquery/tableid_test.go | 6 +++--- lib/destination/ddl/ddl_temp_test.go | 2 +- lib/sql/escape.go | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/clients/bigquery/bigquery_test.go b/clients/bigquery/bigquery_test.go index c6b6d25c9..f191f9e4e 100644 --- a/clients/bigquery/bigquery_test.go +++ b/clients/bigquery/bigquery_test.go @@ -38,15 +38,15 @@ func TestTempTableName(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_") assert.GreaterOrEqual(t, lastUnderscore, 0) - epoch, err := strconv.ParseInt(tableName[lastUnderscore+1:], 10, 64) + epoch, err := strconv.ParseInt(tableName[lastUnderscore+1:len(tableName)-1], 10, 64) assert.NoError(t, err) assert.Greater(t, time.Unix(epoch, 0), time.Now().Add(5*time.Hour)) // default TTL is 6 hours from now - return tableName[:lastUnderscore] + return tableName[:lastUnderscore] + string(tableName[len(tableName)-1]) } store := &Store{config: config.Config{BigQuery: &config.BigQuery{ProjectID: "123454321"}}} tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{Database: "db", Schema: "schema"}, "table") tableID := store.IdentifierFor(tableData.TopicConfig(), tableData.Name()) tempTableName := shared.TempTableID(tableID, "sUfFiX").FullyQualifiedName() - assert.Equal(t, "`123454321`.`db`.table___artie_sUfFiX", trimTTL(tempTableName)) + assert.Equal(t, "`123454321`.`db`.`table___artie_sUfFiX`", trimTTL(tempTableName)) } diff --git a/clients/bigquery/merge_test.go b/clients/bigquery/merge_test.go index e531de9e0..f0b3b1fb6 100644 --- a/clients/bigquery/merge_test.go +++ b/clients/bigquery/merge_test.go @@ -41,20 +41,20 @@ func (b *BigQueryTestSuite) TestBackfillColumn() { { name: "col that has default value that needs to be backfilled (boolean)", col: needsBackfillCol, - backfillSQL: "UPDATE `db`.`public`.tableName SET foo = true WHERE foo IS NULL;", - commentSQL: "ALTER TABLE `db`.`public`.tableName ALTER COLUMN foo SET OPTIONS (description=`{\"backfilled\": true}`);", + backfillSQL: "UPDATE `db`.`public`.`tableName` SET foo = true WHERE foo IS NULL;", + commentSQL: "ALTER TABLE `db`.`public`.`tableName` ALTER COLUMN foo SET OPTIONS (description=`{\"backfilled\": true}`);", }, { name: "col that has default value that needs to be backfilled (string)", col: needsBackfillColStr, - backfillSQL: "UPDATE `db`.`public`.tableName SET foo2 = 'hello there' WHERE foo2 IS NULL;", - commentSQL: "ALTER TABLE `db`.`public`.tableName ALTER COLUMN foo2 SET OPTIONS (description=`{\"backfilled\": true}`);", + backfillSQL: "UPDATE `db`.`public`.`tableName` SET foo2 = 'hello there' WHERE foo2 IS NULL;", + commentSQL: "ALTER TABLE `db`.`public`.`tableName` ALTER COLUMN foo2 SET OPTIONS (description=`{\"backfilled\": true}`);", }, { name: "col that has default value that needs to be backfilled (number)", col: needsBackfillColNum, - backfillSQL: "UPDATE `db`.`public`.tableName SET foo3 = 3.5 WHERE foo3 IS NULL;", - commentSQL: "ALTER TABLE `db`.`public`.tableName ALTER COLUMN foo3 SET OPTIONS (description=`{\"backfilled\": true}`);", + backfillSQL: "UPDATE `db`.`public`.`tableName` SET foo3 = 3.5 WHERE foo3 IS NULL;", + commentSQL: "ALTER TABLE `db`.`public`.`tableName` ALTER COLUMN foo3 SET OPTIONS (description=`{\"backfilled\": true}`);", }, } diff --git a/clients/bigquery/tableid.go b/clients/bigquery/tableid.go index 25ae66162..71f772d15 100644 --- a/clients/bigquery/tableid.go +++ b/clients/bigquery/tableid.go @@ -45,6 +45,6 @@ func (ti TableIdentifier) FullyQualifiedName() string { "`%s`.`%s`.%s", ti.projectID, ti.dataset, - sql.EscapeNameIfNecessary(ti.table, false, constants.BigQuery), + sql.EscapeName(ti.table, false, constants.BigQuery), ) } diff --git a/clients/bigquery/tableid_test.go b/clients/bigquery/tableid_test.go index c60b1fea5..49ccfcaf7 100644 --- a/clients/bigquery/tableid_test.go +++ b/clients/bigquery/tableid_test.go @@ -17,9 +17,9 @@ func TestTableIdentifier_WithTable(t *testing.T) { } func TestTableIdentifier_FullyQualifiedName(t *testing.T) { - // Table name that does not need escaping: - assert.Equal(t, "`project`.`dataset`.foo", NewTableIdentifier("project", "dataset", "foo").FullyQualifiedName()) + // Table name that does not contain a reserved word: + assert.Equal(t, "`project`.`dataset`.`foo`", NewTableIdentifier("project", "dataset", "foo").FullyQualifiedName()) - // Table name that needs escaping: + // Table name that contains a reserved word: assert.Equal(t, "`project`.`dataset`.`table`", NewTableIdentifier("project", "dataset", "table").FullyQualifiedName()) } diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index f4e80c2a5..62d5ab0be 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -115,6 +115,6 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { assert.Equal(d.T(), 1, d.fakeBigQueryStore.ExecCallCount()) bqQuery, _ := d.fakeBigQueryStore.ExecArgsForCall(0) // Cutting off the expiration_timestamp since it's time based. - assert.Contains(d.T(), bqQuery, "CREATE TABLE IF NOT EXISTS `db`.`schema`.tempTableName (foo string,bar float64,`select` string) OPTIONS (expiration_timestamp =") + assert.Contains(d.T(), bqQuery, "CREATE TABLE IF NOT EXISTS `db`.`schema`.`tempTableName` (foo string,bar float64,`select` string) OPTIONS (expiration_timestamp =") } } diff --git a/lib/sql/escape.go b/lib/sql/escape.go index 6caf0bc14..83506de0e 100644 --- a/lib/sql/escape.go +++ b/lib/sql/escape.go @@ -14,7 +14,7 @@ var symbolsToEscape = []string{":"} func EscapeNameIfNecessary(name string, uppercaseEscNames bool, destKind constants.DestinationKind) string { if needsEscaping(name, destKind) { - return escapeName(name, uppercaseEscNames, destKind) + return EscapeName(name, uppercaseEscNames, destKind) } return name } @@ -48,7 +48,7 @@ func needsEscaping(name string, destKind constants.DestinationKind) bool { return false } -func escapeName(name string, uppercaseEscNames bool, destKind constants.DestinationKind) string { +func EscapeName(name string, uppercaseEscNames bool, destKind constants.DestinationKind) string { if uppercaseEscNames { name = strings.ToUpper(name) } From c8ad9c1e919924e855f3562e898dcec63cae2d97 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 18:21:36 -0700 Subject: [PATCH 2/5] Update --- clients/bigquery/tableid_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/bigquery/tableid_test.go b/clients/bigquery/tableid_test.go index 49ccfcaf7..57cc5a2ba 100644 --- a/clients/bigquery/tableid_test.go +++ b/clients/bigquery/tableid_test.go @@ -17,9 +17,9 @@ func TestTableIdentifier_WithTable(t *testing.T) { } func TestTableIdentifier_FullyQualifiedName(t *testing.T) { - // Table name that does not contain a reserved word: + // Table name that is not a reserved word: assert.Equal(t, "`project`.`dataset`.`foo`", NewTableIdentifier("project", "dataset", "foo").FullyQualifiedName()) - // Table name that contains a reserved word: + // Table name that is a reserved word: assert.Equal(t, "`project`.`dataset`.`table`", NewTableIdentifier("project", "dataset", "table").FullyQualifiedName()) } From 768257e7a69bd6f5f389e8634eedacfba4347765 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 18:22:20 -0700 Subject: [PATCH 3/5] Update comment --- clients/bigquery/tableid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/bigquery/tableid.go b/clients/bigquery/tableid.go index 71f772d15..3452e4e3e 100644 --- a/clients/bigquery/tableid.go +++ b/clients/bigquery/tableid.go @@ -40,7 +40,7 @@ func (ti TableIdentifier) WithTable(table string) types.TableIdentifier { func (ti TableIdentifier) FullyQualifiedName() string { // The fully qualified name for BigQuery is: project_id.dataset.tableName. - // We are escaping the project_id and dataset because there could be special characters. + // We are escaping the project_id, dataset, and table because there could be special characters. return fmt.Sprintf( "`%s`.`%s`.%s", ti.projectID, From b69476af3e4954288e2ed0d0922b2f4f39945425 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 21:19:43 -0700 Subject: [PATCH 4/5] Simplify --- clients/bigquery/tableid.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/clients/bigquery/tableid.go b/clients/bigquery/tableid.go index 3452e4e3e..5c091e1b5 100644 --- a/clients/bigquery/tableid.go +++ b/clients/bigquery/tableid.go @@ -3,9 +3,7 @@ package bigquery import ( "fmt" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/sql" ) type TableIdentifier struct { @@ -41,10 +39,5 @@ func (ti TableIdentifier) WithTable(table string) types.TableIdentifier { func (ti TableIdentifier) FullyQualifiedName() string { // The fully qualified name for BigQuery is: project_id.dataset.tableName. // We are escaping the project_id, dataset, and table because there could be special characters. - return fmt.Sprintf( - "`%s`.`%s`.%s", - ti.projectID, - ti.dataset, - sql.EscapeName(ti.table, false, constants.BigQuery), - ) + return fmt.Sprintf("`%s`.`%s`.`%s`", ti.projectID, ti.dataset, ti.table) } From d1690e046712191b59389b86b3c23349f68621bc Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 21:32:27 -0700 Subject: [PATCH 5/5] Fix --- clients/bigquery/bigquery.go | 32 ++++++++++++++----------------- clients/bigquery/bigquery_test.go | 21 -------------------- 2 files changed, 14 insertions(+), 39 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index bc668c724..379ff2c90 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -5,7 +5,6 @@ import ( "fmt" "log/slog" "os" - "strings" "cloud.google.com/go/bigquery" _ "github.com/viant/bigquery" @@ -123,32 +122,29 @@ func (s *Store) GetClient(ctx context.Context) *bigquery.Client { return client } - -func tableRelName(fqName string) (string, error) { - fqNameParts := strings.Split(fqName, ".") - if len(fqNameParts) < 3 { - return "", fmt.Errorf("invalid fully qualified name: %s", fqName) +func (s *Store) putTable(ctx context.Context, dataset string, tableID types.TableIdentifier, rows []*Row) error { + bqTableID, ok := tableID.(TableIdentifier) + if !ok { + return fmt.Errorf("uanble to cast tableID to a BigQuery TableIdentifier") } - return strings.Join(fqNameParts[2:], "."), nil -} - -func (s *Store) putTable(ctx context.Context, dataset string, tableID types.TableIdentifier, rows []*Row) error { - // TODO: [tableID] has [Dataset] on it, don't need to pass it along. - tableName := tableID.FullyQualifiedName() - // TODO: Can probably do `tableName := tableID.Table()` here. - relTableName, err := tableRelName(tableName) - if err != nil { - return fmt.Errorf("failed to get table name: %w", err) + if dataset != bqTableID.Dataset() { + // TODO: [tableID] has [Dataset] on it, don't need to pass it along. + // Remove if we don't see this in the logs. + slog.Error("dataset is different from tableID dataset", + slog.String("dataset", dataset), + slog.String("bqTableID.Dataset", bqTableID.Dataset()), + slog.String("fqName", bqTableID.FullyQualifiedName()), + ) } client := s.GetClient(ctx) defer client.Close() batch := NewBatch(rows, s.batchSize) - inserter := client.Dataset(dataset).Table(relTableName).Inserter() + inserter := client.Dataset(dataset).Table(bqTableID.Table()).Inserter() for batch.HasNext() { - if err = inserter.Put(ctx, batch.NextChunk()); err != nil { + if err := inserter.Put(ctx, batch.NextChunk()); err != nil { return fmt.Errorf("failed to insert rows: %w", err) } } diff --git a/clients/bigquery/bigquery_test.go b/clients/bigquery/bigquery_test.go index f191f9e4e..035c7555c 100644 --- a/clients/bigquery/bigquery_test.go +++ b/clients/bigquery/bigquery_test.go @@ -13,27 +13,6 @@ import ( "github.com/stretchr/testify/assert" ) -func (b *BigQueryTestSuite) TestTableRelName() { - { - relName, err := tableRelName("project.dataset.table") - assert.NoError(b.T(), err) - assert.Equal(b.T(), "table", relName) - } - { - relName, err := tableRelName("project.dataset.table.table") - assert.NoError(b.T(), err) - assert.Equal(b.T(), "table.table", relName) - } - { - // All the possible errors - _, err := tableRelName("project.dataset") - assert.ErrorContains(b.T(), err, "invalid fully qualified name: project.dataset") - - _, err = tableRelName("project") - assert.ErrorContains(b.T(), err, "invalid fully qualified name: project") - } -} - func TestTempTableName(t *testing.T) { trimTTL := func(tableName string) string { lastUnderscore := strings.LastIndex(tableName, "_")