From ee1e09e38bbfcb667f4487972b9d07cd00ad39fb Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 15 Oct 2024 13:27:25 -0700 Subject: [PATCH 1/2] Tweak `maxRequestByteSize` for BigQuery (#961) --- clients/bigquery/bigquery.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 626af5651..5713d8321 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -31,8 +31,8 @@ import ( const ( GooglePathToCredentialsEnvKey = "GOOGLE_APPLICATION_CREDENTIALS" - // Storage Write API is limited to 10 MiB, subtract 250 KiB to account for request overhead. - maxRequestByteSize = (10 * 1024 * 1024) - (250 * 1024) + // Storage Write API is limited to 10 MiB, subtract 400 KiB to account for request overhead. + maxRequestByteSize = (10 * 1024 * 1024) - (400 * 1024) ) type Store struct { From e1c82853982cd815767c0d30648de47cc304a885 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 15 Oct 2024 19:55:15 -0700 Subject: [PATCH 2/2] Update UpsertColumn to include an error response (#963) --- clients/shared/merge.go | 7 ++++- lib/typing/columns/columns.go | 36 +++++++++++++------------ lib/typing/columns/columns_test.go | 43 ++++++++++++++---------------- models/event/event.go | 12 +++++++-- 4 files changed, 55 insertions(+), 43 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 7378173c8..e2442fa06 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -90,9 +90,14 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi for attempts := 0; attempts < backfillMaxRetries; attempts++ { backfillErr = BackfillColumn(dwh, col, tableID) if backfillErr == nil { - tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{ + err = tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{ Backfilled: typing.ToPtr(true), }) + + if err != nil { + return fmt.Errorf("failed to update column backfilled status: %w", err) + } + break } diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 779fdc839..1af0a2c23 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -2,6 +2,7 @@ package columns import ( "errors" + "fmt" "slices" "strings" "sync" @@ -108,9 +109,9 @@ type UpsertColumnArg struct { // UpsertColumn - just a wrapper around UpdateColumn and AddColumn // If it doesn't find a column, it'll add one where the kind = Invalid. -func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) { +func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error { if colName == "" { - return + return fmt.Errorf("column name is empty") } if col, isOk := c.GetColumn(colName); isOk { @@ -127,27 +128,28 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) { } c.UpdateColumn(col) - return - } + } else { + _col := Column{ + name: colName, + KindDetails: typing.Invalid, + } - col := Column{ - name: colName, - KindDetails: typing.Invalid, - } + if arg.ToastCol != nil { + _col.ToastColumn = *arg.ToastCol + } - if arg.ToastCol != nil { - col.ToastColumn = *arg.ToastCol - } + if arg.PrimaryKey != nil { + _col.primaryKey = *arg.PrimaryKey + } - if arg.PrimaryKey != nil { - col.primaryKey = *arg.PrimaryKey - } + if arg.Backfilled != nil { + _col.backfilled = *arg.Backfilled + } - if arg.Backfilled != nil { - col.backfilled = *arg.Backfilled + c.AddColumn(_col) } - c.AddColumn(col) + return nil } func (c *Columns) AddColumn(col Column) { diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 7c283e09e..dca59855e 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -194,36 +194,33 @@ func TestColumns_UpsertColumns(t *testing.T) { // Now selectively update only a, b for _, key := range []string{"a", "b"} { - cols.UpsertColumn(key, UpsertColumnArg{ + assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{ ToastCol: typing.ToPtr(true), - }) + })) // Now inspect. col, _ := cols.GetColumn(key) assert.True(t, col.ToastColumn) } - - cols.UpsertColumn("zzz", UpsertColumnArg{}) - zzzCol, _ := cols.GetColumn("zzz") - assert.False(t, zzzCol.ToastColumn) - assert.False(t, zzzCol.primaryKey) - assert.Equal(t, zzzCol.KindDetails, typing.Invalid) - - cols.UpsertColumn("aaa", UpsertColumnArg{ - ToastCol: typing.ToPtr(true), - PrimaryKey: typing.ToPtr(true), - }) - aaaCol, _ := cols.GetColumn("aaa") - assert.True(t, aaaCol.ToastColumn) - assert.True(t, aaaCol.primaryKey) - assert.Equal(t, aaaCol.KindDetails, typing.Invalid) - - length := len(cols.columns) - for i := 0; i < 500; i++ { - cols.UpsertColumn("", UpsertColumnArg{}) + { + assert.NoError(t, cols.UpsertColumn("zzz", UpsertColumnArg{})) + zzzCol, _ := cols.GetColumn("zzz") + assert.False(t, zzzCol.ToastColumn) + assert.False(t, zzzCol.primaryKey) + assert.Equal(t, zzzCol.KindDetails, typing.Invalid) } - - assert.Equal(t, length, len(cols.columns)) + { + assert.NoError(t, cols.UpsertColumn("aaa", UpsertColumnArg{ + ToastCol: typing.ToPtr(true), + PrimaryKey: typing.ToPtr(true), + })) + + aaaCol, _ := cols.GetColumn("aaa") + assert.True(t, aaaCol.ToastColumn) + assert.True(t, aaaCol.primaryKey) + assert.Equal(t, aaaCol.KindDetails, typing.Invalid) + } + assert.ErrorContains(t, cols.UpsertColumn("", UpsertColumnArg{}), "column name is empty") } func TestColumns_Add_Duplicate(t *testing.T) { diff --git a/models/event/event.go b/models/event/event.go index 70a786df2..889bc2845 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -54,13 +54,17 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi // Now iterate over pkMap and tag each column that is a primary key if cols != nil { for primaryKey := range pkMap { - cols.UpsertColumn( + err = cols.UpsertColumn( // We need to escape the column name similar to have parity with event.GetColumns() columns.EscapeName(primaryKey), columns.UpsertColumnArg{ PrimaryKey: typing.ToPtr(true), }, ) + + if err != nil { + return Event{}, fmt.Errorf("failed to upsert column: %w", err) + } } } @@ -223,9 +227,13 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali } if toastedCol { - inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ + err := inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ ToastCol: typing.ToPtr(true), }) + + if err != nil { + return false, "", fmt.Errorf("failed to upsert column: %w", err) + } } else { retrievedColumn, isOk := inMemoryColumns.GetColumn(newColName) if !isOk {