Skip to content

Commit

Permalink
[BigQuery] Use StorageWrite API + Dedupe (#726)
Browse files Browse the repository at this point in the history
Co-authored-by: Nathan <[email protected]>
  • Loading branch information
Tang8330 and nathan-artie authored Jun 17, 2024
1 parent 355beb5 commit a320314
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 11 deletions.
3 changes: 2 additions & 1 deletion clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
useStorageWriteAPI = true
)

type Store struct {
Expand Down Expand Up @@ -71,7 +72,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
}

// Load the data
if s.config.BigQuery.UseStorageWriteAPI {
if useStorageWriteAPI {
return s.putTableViaStorageWriteAPI(context.Background(), bqTempTableID, tableData)
} else {
return s.putTableViaLegacyAPI(context.Background(), bqTempTableID, tableData)
Expand Down
11 changes: 10 additions & 1 deletion clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (BigQueryDialect) IsColumnAlreadyExistsErr(err error) bool {
return strings.Contains(err.Error(), "Column already exists")
}

func (BigQueryDialect) IsTableDoesNotExistErr(err error) bool {
func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool {
return false
}

Expand Down Expand Up @@ -154,6 +154,15 @@ func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)
return fmt.Sprintf(`(SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1)`,
tableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(primaryKeysEscaped, ", "),
)
}

func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)

Expand Down
2 changes: 2 additions & 0 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
AdditionalEqualityStrings: additionalEqualityStrings,
// BigQuery has DDL quotas.
RetryColBackfill: true,
// We are using BigQuery's streaming API which doesn't guarantee exactly once semantics
SubQueryDedupe: true,
})
}

Expand Down
4 changes: 4 additions & 0 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.Table
return fmt.Sprintf("COALESCE(%s, '') != '%s'", colName, constants.ToastUnavailableValuePlaceholder)
}

func (MSSQLDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
panic("not implemented")
}

func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
panic("not implemented") // We don't currently support deduping for MS SQL.
}
Expand Down
4 changes: 4 additions & 0 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.Ta
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string {
return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName())
}

func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd)

Expand Down
5 changes: 2 additions & 3 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
}
}

temporaryTableName := temporaryTableID.FullyQualifiedName()
subQuery := temporaryTableName
subQuery := temporaryTableID.FullyQualifiedName()
if opts.SubQueryDedupe {
subQuery = fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, temporaryTableName)
subQuery = dwh.Dialect().BuildDedupeTableQuery(temporaryTableID, tableData.PrimaryKeys())
}

if subQuery == "" {
Expand Down
4 changes: 4 additions & 0 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (sd SnowflakeDialect) BuildIsNotToastValueExpression(tableAlias constants.T
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
panic("not implemented")
}

func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd)

Expand Down
11 changes: 5 additions & 6 deletions lib/config/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import "fmt"
type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
// Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet.
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
}

func (b *BigQuery) LoadDefaultValues() {
Expand Down
1 change: 1 addition & 0 deletions lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Dialect interface {
BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string
BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string
BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string
BuildDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string
BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string
BuildMergeQueries(
tableID TableIdentifier,
Expand Down

0 comments on commit a320314

Please sign in to comment.