Skip to content

Commit

Permalink
Default Values.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 17, 2024
1 parent fda4bf1 commit 5e51e0a
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 38 deletions.
7 changes: 7 additions & 0 deletions clients/bigquery/dialect/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dialect

import "github.com/artie-labs/transfer/lib/sql"

func (BigQueryDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Backfill
}
7 changes: 7 additions & 0 deletions clients/mssql/dialect/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dialect

import "github.com/artie-labs/transfer/lib/sql"

func (MSSQLDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Native
}
7 changes: 7 additions & 0 deletions clients/redshift/dialect/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dialect

import "github.com/artie-labs/transfer/lib/sql"

func (RedshiftDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Backfill
}
75 changes: 39 additions & 36 deletions clients/shared/default_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log/slog"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
mssqlDialect "github.com/artie-labs/transfer/clients/mssql/dialect"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
Expand Down Expand Up @@ -60,44 +59,48 @@ func DefaultValue(column columns.Column, dialect sql.Dialect) (any, error) {
}

func BackfillColumn(dwh destination.DataWarehouse, column columns.Column, tableID sql.TableIdentifier) error {
if !column.ShouldBackfill() {
// If we don't need to backfill, don't backfill.
return nil
}
switch dwh.Dialect().GetDefaultValueStrategy() {
case sql.Backfill:
if !column.ShouldBackfill() {
// If we don't need to backfill, don't backfill.
return nil
}

if _, ok := dwh.Dialect().(mssqlDialect.MSSQLDialect); ok {
// TODO: Support MSSQL column backfill
return nil
}
defaultVal, err := DefaultValue(column, dwh.Dialect())
if err != nil {
return fmt.Errorf("failed to escape default value: %w", err)
}

defaultVal, err := DefaultValue(column, dwh.Dialect())
if err != nil {
return fmt.Errorf("failed to escape default value: %w", err)
}
escapedCol := dwh.Dialect().QuoteIdentifier(column.Name())
query := fmt.Sprintf(`UPDATE %s SET %s = %v WHERE %s IS NULL;`,
// UPDATE table SET col = default_val WHERE col IS NULL
tableID.FullyQualifiedName(), escapedCol, defaultVal, escapedCol,
)
slog.Info("Backfilling column",
slog.String("colName", column.Name()),
slog.String("query", query),
slog.String("table", tableID.FullyQualifiedName()),
)

escapedCol := dwh.Dialect().QuoteIdentifier(column.Name())
query := fmt.Sprintf(`UPDATE %s SET %s = %v WHERE %s IS NULL;`,
// UPDATE table SET col = default_val WHERE col IS NULL
tableID.FullyQualifiedName(), escapedCol, defaultVal, escapedCol,
)
slog.Info("Backfilling column",
slog.String("colName", column.Name()),
slog.String("query", query),
slog.String("table", tableID.FullyQualifiedName()),
)

if _, err = dwh.Exec(query); err != nil {
return fmt.Errorf("failed to backfill, err: %w, query: %v", err, query)
}
if _, err = dwh.Exec(query); err != nil {
return fmt.Errorf("failed to backfill, err: %w, query: %v", err, query)
}

query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`)
if _, ok := dwh.Dialect().(bigQueryDialect.BigQueryDialect); ok {
query = fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET OPTIONS (description=`%s`);",
// ALTER TABLE table ALTER COLUMN col set OPTIONS (description=...)
tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`,
)
}
query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`)
if _, ok := dwh.Dialect().(bigQueryDialect.BigQueryDialect); ok {
query = fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET OPTIONS (description=`%s`);",
// ALTER TABLE table ALTER COLUMN col set OPTIONS (description=...)
tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`,
)
}

_, err = dwh.Exec(query)
return err
_, err = dwh.Exec(query)

Check failure on line 97 in clients/shared/default_value.go

View workflow job for this annotation

GitHub Actions / test

this value of err is never used (SA4006)
return nil

case sql.Native:
// TODO: Support native strat
return nil
default:
return fmt.Errorf("unknown default value strategy: %q", dwh.Dialect().GetDefaultValueStrategy())
}
}
3 changes: 1 addition & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
}

// Columns that are missing in DWH, but exist in our CDC stream.
err = createAlterTableArgs.AlterTable(dwh, targetKeysMissing...)
if err != nil {
if err = createAlterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

Expand Down
7 changes: 7 additions & 0 deletions clients/snowflake/dialect/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dialect

import "github.com/artie-labs/transfer/lib/sql"

func (SnowflakeDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Backfill
}
12 changes: 12 additions & 0 deletions lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

type DefaultValueStrategy int

const (
// Backfill - use backfill strategy for default values
Backfill DefaultValueStrategy = iota
// Native - set default values directly into the destination
Native
)

type TableIdentifier interface {
EscapedTable() string
Table() string
Expand Down Expand Up @@ -36,4 +45,7 @@ type Dialect interface {
// containsHardDeletes is only used for Redshift where we do not issue a DELETE statement if there are no hard deletes in the batch
containsHardDeletes bool,
) ([]string, error)

// Default values
GetDefaultValueStrategy() DefaultValueStrategy
}

0 comments on commit 5e51e0a

Please sign in to comment.