diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 7378173c8..e2442fa06 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -90,9 +90,14 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi for attempts := 0; attempts < backfillMaxRetries; attempts++ { backfillErr = BackfillColumn(dwh, col, tableID) if backfillErr == nil { - tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{ + err = tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{ Backfilled: typing.ToPtr(true), }) + + if err != nil { + return fmt.Errorf("failed to update column backfilled status: %w", err) + } + break } diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 779fdc839..792c45c9c 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -2,6 +2,7 @@ package columns import ( "errors" + "fmt" "slices" "strings" "sync" @@ -108,9 +109,9 @@ type UpsertColumnArg struct { // UpsertColumn - just a wrapper around UpdateColumn and AddColumn // If it doesn't find a column, it'll add one where the kind = Invalid. -func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) { +func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error { if colName == "" { - return + return fmt.Errorf("column name is empty") } if col, isOk := c.GetColumn(colName); isOk { @@ -127,7 +128,7 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) { } c.UpdateColumn(col) - return + return nil } col := Column{ @@ -148,6 +149,7 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) { } c.AddColumn(col) + return nil } func (c *Columns) AddColumn(col Column) { diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 7c283e09e..1b9ea82f3 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -194,25 +194,25 @@ func TestColumns_UpsertColumns(t *testing.T) { // Now selectively update only a, b for _, key := range []string{"a", "b"} { - cols.UpsertColumn(key, UpsertColumnArg{ + assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{ ToastCol: typing.ToPtr(true), - }) + })) // Now inspect. col, _ := cols.GetColumn(key) assert.True(t, col.ToastColumn) } - cols.UpsertColumn("zzz", UpsertColumnArg{}) + assert.NoError(t, cols.UpsertColumn("zzz", UpsertColumnArg{})) zzzCol, _ := cols.GetColumn("zzz") assert.False(t, zzzCol.ToastColumn) assert.False(t, zzzCol.primaryKey) assert.Equal(t, zzzCol.KindDetails, typing.Invalid) - cols.UpsertColumn("aaa", UpsertColumnArg{ + assert.NoError(t, cols.UpsertColumn("aaa", UpsertColumnArg{ ToastCol: typing.ToPtr(true), PrimaryKey: typing.ToPtr(true), - }) + })) aaaCol, _ := cols.GetColumn("aaa") assert.True(t, aaaCol.ToastColumn) assert.True(t, aaaCol.primaryKey) @@ -220,7 +220,7 @@ func TestColumns_UpsertColumns(t *testing.T) { length := len(cols.columns) for i := 0; i < 500; i++ { - cols.UpsertColumn("", UpsertColumnArg{}) + assert.ErrorContains(t, cols.UpsertColumn("", UpsertColumnArg{}), "column name is empty") } assert.Equal(t, length, len(cols.columns)) diff --git a/models/event/event.go b/models/event/event.go index 70a786df2..889bc2845 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -54,13 +54,17 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi // Now iterate over pkMap and tag each column that is a primary key if cols != nil { for primaryKey := range pkMap { - cols.UpsertColumn( + err = cols.UpsertColumn( // We need to escape the column name similar to have parity with event.GetColumns() columns.EscapeName(primaryKey), columns.UpsertColumnArg{ PrimaryKey: typing.ToPtr(true), }, ) + + if err != nil { + return Event{}, fmt.Errorf("failed to upsert column: %w", err) + } } } @@ -223,9 +227,13 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali } if toastedCol { - inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ + err := inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ ToastCol: typing.ToPtr(true), }) + + if err != nil { + return false, "", fmt.Errorf("failed to upsert column: %w", err) + } } else { retrievedColumn, isOk := inMemoryColumns.GetColumn(newColName) if !isOk {