Skip to content

Commit

Permalink
Improving BigQuery throughput (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 20, 2023
1 parent e412bc4 commit 2185957
Show file tree
Hide file tree
Showing 28 changed files with 724 additions and 570 deletions.
19 changes: 19 additions & 0 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"

"cloud.google.com/go/bigquery"
_ "github.com/viant/bigquery"

"github.com/artie-labs/transfer/lib/config"
Expand Down Expand Up @@ -33,6 +34,24 @@ func (s *Store) Label() constants.DestinationKind {
return constants.BigQuery
}

func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
settings := config.FromContext(ctx)
client, err := bigquery.NewClient(ctx, settings.Config.BigQuery.ProjectID)
if err != nil {
logger.FromContext(ctx).WithError(err).Fatalf("failed to get bigquery client")
}

return client
}

func (s *Store) PutTable(ctx context.Context, dataset, tableName string, rows []*Row) error {
client := s.GetClient(ctx)
defer client.Close()

inserter := client.Dataset(dataset).Table(tableName).Inserter()
return inserter.Put(ctx, rows)
}

func LoadBigQuery(ctx context.Context, _store *db.Store) *Store {
if _store != nil {
// Used for tests.
Expand Down
40 changes: 16 additions & 24 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,54 @@ package bigquery
import (
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/array"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing"
)

func CastColVal(colVal interface{}, colKind typing.Column) (string, error) {
func CastColVal(colVal interface{}, colKind typing.Column) (interface{}, error) {
if colVal != nil {
switch colKind.KindDetails.Kind {
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
}

switch extTime.NestedKind.Type {
// https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery#sending_datetime_data
case ext.DateTimeKindType:
colVal = fmt.Sprintf("PARSE_DATETIME('%s', '%v')", RFC3339Format, extTime.String(time.RFC3339Nano))
colVal = extTime.StringUTC(ext.BigQueryDateTimeFormat)
case ext.DateKindType:
colVal = fmt.Sprintf("PARSE_DATE('%s', '%v')", PostgresDateFormat, extTime.String(ext.Date.Format))
colVal = extTime.String(ext.PostgresDateFormat)
case ext.TimeKindType:
colVal = fmt.Sprintf("PARSE_TIME('%s', '%v')", PostgresTimeFormatNoTZ, extTime.String(ext.PostgresTimeFormatNoTZ))
colVal = extTime.String(typing.StreamingTimeFormat)
}
// All the other types do not need string wrapping.
case typing.String.Kind, typing.Struct.Kind:
colVal = stringutil.Wrap(colVal)
colVal = stringutil.LineBreaksToCarriageReturns(fmt.Sprint(colVal))
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = typing.BigQueryJSON(fmt.Sprintf(`{"key": "%s"}`, constants.ToastUnavailableValuePlaceholder))
} else {
// This is how you cast string -> JSON
colVal = fmt.Sprintf("JSON %s", colVal)
colVal = map[string]interface{}{
"key": constants.ToastUnavailableValuePlaceholder,
}
}
}
case typing.Array.Kind:
var err error
colVal, err = array.InterfaceToArrayStringEscaped(colVal)
colVal, err = array.InterfaceToArrayString(colVal)
if err != nil {
return "", err
return nil, err
}

return colVal, nil
}
} else {
if colKind.KindDetails == typing.String {
// BigQuery does not like null as a string for CTEs.
// It throws this error: Value of type INT64 cannot be assigned to column name, which has type STRING
colVal = "''"
} else {
colVal = "null"
}

return fmt.Sprint(colVal), nil
}

return fmt.Sprint(colVal), nil
return nil, nil
}
77 changes: 58 additions & 19 deletions clients/bigquery/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package bigquery

import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"

Expand All @@ -14,40 +17,76 @@ func TestCastColVal(t *testing.T) {
colVal interface{}
colKind typing.Column

expectedErr error
expectedString string
expectedErr error
expectedValue interface{}
}

tsKind := typing.ETime
tsKind.ExtendedTimeDetails = &ext.DateTime

dateKind := typing.ETime
dateKind.ExtendedTimeDetails = &ext.Date

birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC)
birthdayTSExt, err := ext.NewExtendedTime(birthday, tsKind.ExtendedTimeDetails.Type, "")
assert.NoError(t, err)

birthdayDateExt, err := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "")
assert.NoError(t, err)

timeKind := typing.ETime
timeKind.ExtendedTimeDetails = &ext.Time
birthdayTimeExt, err := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "")
assert.NoError(t, err)

testCases := []_testCase{
{
name: "escaping string",
colVal: "foo",
colKind: typing.Column{KindDetails: typing.String},
expectedString: "'foo'",
name: "escaping string",
colVal: "foo",
colKind: typing.Column{KindDetails: typing.String},
expectedValue: "foo",
},
{
name: "123 as int",
colVal: 123,
colKind: typing.Column{KindDetails: typing.Integer},
expectedValue: "123",
},
{
name: "struct",
colVal: `{"hello": "world"}`,
colKind: typing.Column{KindDetails: typing.Struct},
expectedValue: `{"hello": "world"}`,
},
{
name: "array",
colVal: []int{1, 2, 3, 4, 5},
colKind: typing.Column{KindDetails: typing.Array},
expectedValue: []string{"1", "2", "3", "4", "5"},
},
{
name: "123 as int",
colVal: 123,
colKind: typing.Column{KindDetails: typing.Integer},
expectedString: "123",
name: "timestamp",
colVal: birthdayTSExt,
colKind: typing.Column{KindDetails: tsKind},
expectedValue: "2022-09-06 03:19:24.942",
},
{
name: "struct",
colVal: `{"hello": "world"}`,
colKind: typing.Column{KindDetails: typing.Struct},
expectedString: `JSON '{"hello": "world"}'`,
name: "date",
colVal: birthdayDateExt,
colKind: typing.Column{KindDetails: dateKind},
expectedValue: "2022-09-06",
},
{
name: "array",
colVal: []int{1, 2, 3, 4, 5},
colKind: typing.Column{KindDetails: typing.Array},
expectedString: `['1','2','3','4','5']`,
name: "time",
colVal: birthdayTimeExt,
colKind: typing.Column{KindDetails: timeKind},
expectedValue: "03:19:24",
},
}

for _, testCase := range testCases {
actualString, actualErr := CastColVal(testCase.colVal, testCase.colKind)
assert.Equal(t, testCase.expectedErr, actualErr, testCase.name)
assert.Equal(t, testCase.expectedString, actualString, testCase.name)
assert.Equal(t, testCase.expectedValue, actualString, testCase.name)
}
}
8 changes: 0 additions & 8 deletions clients/bigquery/constants.go

This file was deleted.

111 changes: 71 additions & 40 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,57 @@ package bigquery
import (
"context"
"fmt"
"strings"

"cloud.google.com/go/bigquery"

"github.com/artie-labs/transfer/lib/dwh/dml"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/dwh/ddl"
"github.com/artie-labs/transfer/lib/dwh/dml"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/typing"
)

func merge(tableData *optimization.TableData) (string, error) {
var cols []string
// Given all the columns, diff this against SFLK.
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() {
if col.KindDetails == typing.Invalid {
// Don't update BQ
continue
}
type Row struct {
data map[string]bigquery.Value
}

cols = append(cols, col.Name)
func NewRow(data map[string]bigquery.Value) *Row {
return &Row{
data: data,
}
}

var rowValues []string
firstRow := true
func (r *Row) Save() (map[string]bigquery.Value, string, error) {
return r.data, bigquery.NoDedupeID, nil
}

func merge(tableData *optimization.TableData) ([]*Row, error) {
var rows []*Row
for _, value := range tableData.RowsData() {
var colVals []string
for _, col := range cols {
data := make(map[string]bigquery.Value)
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal, err := CastColVal(value[col], colKind)
if err != nil {
return "", err
return nil, err
}

if firstRow {
colVal = fmt.Sprintf("%v as %s", colVal, col)
if colVal != nil {
data[col] = colVal
}

colVals = append(colVals, colVal)
}

firstRow = false
rowValues = append(rowValues, fmt.Sprintf("SELECT %s", strings.Join(colVals, ",")))
rows = append(rows, NewRow(data))
}

subQuery := strings.Join(rowValues, " UNION ALL ")

return dml.MergeStatement(dml.MergeArgument{
FqTableName: tableData.ToFqName(constants.BigQuery),
SubQuery: subQuery,
IdempotentKey: tableData.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: cols,
ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.SoftDelete,
BigQueryTypeCasting: true,
})
return rows, nil
}

func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
// TODO - write test for this.

if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil {
// There's no rows or columns. Let's skip.
return nil
Expand All @@ -85,7 +76,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tableData.ToFqName(constants.BigQuery),
FqTableName: tableData.ToFqName(s.Label()),
CreateTable: tableConfig.CreateTable,
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Expand All @@ -104,7 +95,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
deleteAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tableData.ToFqName(constants.BigQuery),
FqTableName: tableData.ToFqName(s.Label()),
CreateTable: false,
ColumnOp: constants.Delete,
CdcTime: tableData.LatestCDCTs,
Expand Down Expand Up @@ -133,14 +124,54 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}
}

// Start temporary table creation
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: fmt.Sprintf("%s_%s", tableData.ToFqName(s.Label()), tableData.TempTableSuffix()),
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
}

if err = ddl.AlterTable(ctx, tempAlterTableArgs, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table, error: %v", err)
}
// End temporary table creation

tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)
query, err := merge(tableData)
rows, err := merge(tableData)
if err != nil {
log.WithError(err).Warn("failed to generate the merge query")
return err
}

log.WithField("query", query).Debug("executing...")
_, err = s.Exec(query)
return err
tableName := fmt.Sprintf("%s_%s", tableData.TableName, tableData.TempTableSuffix())
err = s.PutTable(ctx, tableData.Database, tableName, rows)
if err != nil {
return fmt.Errorf("failed to insert into temp table: %s, error: %v", tableName, err)
}

mergeQuery, err := dml.MergeStatement(dml.MergeArgument{
FqTableName: tableData.ToFqName(constants.BigQuery),
SubQuery: tempAlterTableArgs.FqTableName,
IdempotentKey: tableData.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(),
ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.SoftDelete,
BigQuery: true,
})

if err != nil {
return err
}

_, err = s.Exec(mergeQuery)
if err != nil {
return err
}

ddl.DropTemporaryTable(ctx, s, tempAlterTableArgs.FqTableName)
return nil
}
Loading

0 comments on commit 2185957

Please sign in to comment.