Skip to content

Commit

Permalink
Support default values (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 22, 2023
1 parent 11be53f commit 4084fbd
Show file tree
Hide file tree
Showing 27 changed files with 892 additions and 110 deletions.
13 changes: 10 additions & 3 deletions clients/bigquery/bigquery_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package bigquery

import (
"context"
"testing"

"github.com/artie-labs/transfer/lib/config"

"github.com/artie-labs/transfer/lib/db"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/stretchr/testify/suite"
"testing"
)

type BigQueryTestSuite struct {
suite.Suite
fakeStore *mocks.FakeStore
store *Store
ctx context.Context
}

func (b *BigQueryTestSuite) SetupTest() {
ctx := context.Background()
b.ctx = config.InjectSettingsIntoContext(context.Background(), &config.Settings{
VerboseLogging: false,
})

b.fakeStore = &mocks.FakeStore{}
store := db.Store(b.fakeStore)
b.store = LoadBigQuery(ctx, &store)
b.store = LoadBigQuery(b.ctx, &store)
}

func TestBigQueryTestSuite(t *testing.T) {
Expand Down
48 changes: 37 additions & 11 deletions clients/bigquery/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bigquery

import (
"context"
"encoding/json"
"fmt"
"strings"

Expand All @@ -16,15 +17,25 @@ import (
)

const (
// Column names from SELECT column_name, data_type FROM `project.INFORMATION_SCHEMA.COLUMNS` WHERE table_name="table";
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeCommentCol = "description"
)

func (s *Store) describeTable(ctx context.Context, tableData *optimization.TableData) (map[string]string, error) {
type columnMetadata struct {
Type string
Comment string
}

func (s *Store) describeTable(ctx context.Context, tableData *optimization.TableData) (map[string]columnMetadata, error) {
log := logger.FromContext(ctx)
rows, err := s.Query(fmt.Sprintf("SELECT column_name, data_type FROM `%s.INFORMATION_SCHEMA.COLUMNS` WHERE table_name='%s';",
tableData.TopicConfig.Database, tableData.Name()))

// We modified this from COLUMN to COLUMN_FIELD_PATHS, so we can get the column description.
// https://cloud.google.com/bigquery/docs/information-schema-column-field-paths
query := fmt.Sprintf("SELECT column_name, data_type, description FROM `%s.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` WHERE table_name='%s';",
tableData.TopicConfig.Database, tableData.Name())

rows, err := s.Query(query)
defer func() {
if rows != nil {
err = rows.Close()
Expand All @@ -37,10 +48,10 @@ func (s *Store) describeTable(ctx context.Context, tableData *optimization.Table
if err != nil {
// The query will not fail if the table doesn't exist. It will simply return 0 rows.
// It WILL fail if the dataset doesn't exist or if it encounters any other forms of error.
return nil, err
return nil, fmt.Errorf("failed to query, err: %v, query: %v", err, query)
}

retMap := make(map[string]string)
retMap := make(map[string]columnMetadata)
for rows != nil && rows.Next() {
// figure out what columns were returned
// the column names will be the JSON object field keys
Expand Down Expand Up @@ -72,7 +83,10 @@ func (s *Store) describeTable(ctx context.Context, tableData *optimization.Table
row[columnNameList[idx]] = strings.ToLower(fmt.Sprint(*interfaceVal))
}

retMap[row[describeNameCol]] = row[describeTypeCol]
retMap[row[describeNameCol]] = columnMetadata{
Type: row[describeTypeCol],
Comment: row[describeCommentCol],
}
}

return retMap, nil
Expand All @@ -91,8 +105,20 @@ func (s *Store) getTableConfig(ctx context.Context, tableData *optimization.Tabl
}

var bqColumns columns.Columns
for column, columnType := range retMap {
bqColumns.AddColumn(columns.NewColumn(column, typing.BigQueryTypeToKind(columnType)))
for column, metadata := range retMap {
col := columns.NewColumn(column, typing.BigQueryTypeToKind(metadata.Type))
if metadata.Comment != "" {
// Try to parse the comment.
var _colComment constants.ColComment
err = json.Unmarshal([]byte(metadata.Comment), &_colComment)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal comment, err: %v", err)
}

col.SetBackfilled(_colComment.Backfilled)
}

bqColumns.AddColumn(col)
}

// If retMap is empty, it'll create a new table.
Expand Down
4 changes: 4 additions & 0 deletions clients/bigquery/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ func ColumnAlreadyExistErr(err error) bool {
// Error ends up looking like something like this: Column already exists: _string at [1:39]
return strings.Contains(err.Error(), "Column already exists")
}

func TableUpdateQuotaErr(err error) bool {
return strings.Contains(err.Error(), "Exceeded rate limits: too many table update operations for this table")
}
73 changes: 73 additions & 0 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package bigquery
import (
"context"
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/ptr"

"github.com/artie-labs/transfer/lib/jitter"

"github.com/artie-labs/transfer/lib/typing/columns"

Expand Down Expand Up @@ -52,6 +58,47 @@ func merge(tableData *optimization.TableData) ([]*Row, error) {
return rows, nil
}

// BackfillColumn will perform a backfill to the destination and also update the comment within a transaction.
// Source: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#column_set_options_list
func (s *Store) backfillColumn(ctx context.Context, column columns.Column, fqTableName string) error {
if !column.ShouldBackfill() {
// If we don't need to backfill, don't backfill.
return nil
}

defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{
Escape: true,
BigQuery: true,
})

if err != nil {
return fmt.Errorf("failed to escape default value, err: %v", err)
}

fqTableName = strings.ToLower(fqTableName)
escapedCol := column.Name(&columns.NameArgs{Escape: true, DestKind: s.Label()})
query := fmt.Sprintf(`UPDATE %s SET %s = %v WHERE %s IS NULL;`,
// UPDATE table SET col = default_val WHERE col IS NULL
fqTableName, escapedCol, defaultVal, escapedCol)

logger.FromContext(ctx).WithFields(map[string]interface{}{
"colName": column.Name(nil),
"query": query,
"table": fqTableName,
}).Info("backfilling column")
_, err = s.Exec(query)
if err != nil {
return fmt.Errorf("failed to backfill, err: %v, query: %v", err, query)
}

query = fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET OPTIONS (description=`%s`);",
// ALTER TABLE table ALTER COLUMN col set OPTIONS (description=...)
fqTableName, escapedCol, `{"backfilled": true}`,
)
_, err = s.Exec(query)
return err
}

func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
// TODO - write test for this.
if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil {
Expand Down Expand Up @@ -136,6 +183,32 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}
// End temporary table creation

// Backfill columns if necessary
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() {
var attempts int
for {
err = s.backfillColumn(ctx, col, tableData.ToFqName(ctx, s.Label()))
if err == nil {
tableConfig.Columns().UpsertColumn(col.Name(nil), columns.UpsertColumnArg{
Backfilled: ptr.ToBool(true),
})
break
}

if TableUpdateQuotaErr(err) {
err = nil
attempts += 1
time.Sleep(time.Duration(jitter.JitterMs(1500, attempts)) * time.Millisecond)
} else {
defaultVal, _ := col.DefaultValue(nil)
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %v",
col.Name(nil), defaultVal, err)
}
}

}

// Perform actual merge now
rows, err := merge(tableData)
if err != nil {
log.WithError(err).Warn("failed to generate the merge query")
Expand Down
80 changes: 80 additions & 0 deletions clients/bigquery/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package bigquery

import (
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"
)

func (b *BigQueryTestSuite) TestBackfillColumn() {
fqTableName := "db.public.tableName"
type _testCase struct {
name string
col columns.Column
expectErr bool
backfillSQL string
commentSQL string
}

backfilledCol := columns.NewColumn("foo", typing.Invalid)
backfilledCol.SetDefaultValue(true)
backfilledCol.SetBackfilled(true)

needsBackfillCol := columns.NewColumn("foo", typing.Invalid)
needsBackfillCol.SetDefaultValue(true)

needsBackfillColStr := columns.NewColumn("foo2", typing.String)
needsBackfillColStr.SetDefaultValue("hello there")

needsBackfillColNum := columns.NewColumn("foo3", typing.Float)
needsBackfillColNum.SetDefaultValue(3.5)
testCases := []_testCase{
{
name: "col that doesn't have default val",
col: columns.NewColumn("foo", typing.Invalid),
},
{
name: "col that has default value but already backfilled",
col: backfilledCol,
},
{
name: "col that has default value that needs to be backfilled (boolean)",
col: needsBackfillCol,
backfillSQL: `UPDATE db.public.tablename SET foo = true WHERE foo IS NULL;`,
commentSQL: "ALTER TABLE db.public.tablename ALTER COLUMN foo SET OPTIONS (description=`{\"backfilled\": true}`);",
},
{
name: "col that has default value that needs to be backfilled (string)",
col: needsBackfillColStr,
backfillSQL: `UPDATE db.public.tablename SET foo2 = 'hello there' WHERE foo2 IS NULL;`,
commentSQL: "ALTER TABLE db.public.tablename ALTER COLUMN foo2 SET OPTIONS (description=`{\"backfilled\": true}`);",
},
{
name: "col that has default value that needs to be backfilled (number)",
col: needsBackfillColNum,
backfillSQL: `UPDATE db.public.tablename SET foo3 = 3.5 WHERE foo3 IS NULL;`,
commentSQL: "ALTER TABLE db.public.tablename ALTER COLUMN foo3 SET OPTIONS (description=`{\"backfilled\": true}`);",
},
}

var index int
for _, testCase := range testCases {
err := b.store.backfillColumn(b.ctx, testCase.col, fqTableName)
if testCase.expectErr {
assert.Error(b.T(), err, testCase.name)
continue
}

assert.NoError(b.T(), err, testCase.name)
if testCase.backfillSQL != "" && testCase.commentSQL != "" {
backfillSQL, _ := b.fakeStore.ExecArgsForCall(index)
assert.Equal(b.T(), testCase.backfillSQL, backfillSQL, testCase.name)

commentSQL, _ := b.fakeStore.ExecArgsForCall(index + 1)
assert.Equal(b.T(), testCase.commentSQL, commentSQL, testCase.name)
index += 2
} else {
assert.Equal(b.T(), index, b.fakeStore.ExecCallCount())
}
}
}
21 changes: 18 additions & 3 deletions clients/snowflake/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package snowflake

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/dwh/types"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/typing"
)

func (s *Store) getTableConfig(ctx context.Context, fqName string, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
Expand Down Expand Up @@ -75,11 +79,22 @@ func (s *Store) getTableConfig(ctx context.Context, fqName string, dropDeletedCo
row[columnNameList[idx]] = strings.ToLower(fmt.Sprint(*interfaceVal))
}

snowflakeColumns.AddColumn(columns.NewColumn(row[describeNameCol], typing.SnowflakeTypeToKind(row[describeTypeCol])))
col := columns.NewColumn(row[describeNameCol], typing.SnowflakeTypeToKind(row[describeTypeCol]))
if comment, isOk := row[describeCommentCol]; isOk && comment != "<nil>" {
// Try to parse the comment.
var _colComment constants.ColComment
err = json.Unmarshal([]byte(comment), &_colComment)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal comment, err: %v", err)
}

col.SetBackfilled(_colComment.Backfilled)
}

snowflakeColumns.AddColumn(col)
}

sflkTableConfig := types.NewDwhTableConfig(&snowflakeColumns, nil, tableMissing, dropDeletedColumns)
s.configMap.AddTableToConfig(fqName, sflkTableConfig)

return sflkTableConfig, nil
}
5 changes: 3 additions & 2 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type Store struct {

const (
// Column names from the output of DESC table;
describeNameCol = "name"
describeTypeCol = "type"
describeNameCol = "name"
describeTypeCol = "type"
describeCommentCol = "comment"
)

func (s *Store) Label() constants.DestinationKind {
Expand Down
Loading

0 comments on commit 4084fbd

Please sign in to comment.