Skip to content

Commit

Permalink
Update UpsertColumn.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 16, 2024
1 parent ee1e09e commit 0282f2f
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
7 changes: 6 additions & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
for attempts := 0; attempts < backfillMaxRetries; attempts++ {
backfillErr = BackfillColumn(dwh, col, tableID)
if backfillErr == nil {
tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{
err = tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{
Backfilled: typing.ToPtr(true),
})

if err != nil {
return fmt.Errorf("failed to update column backfilled status: %w", err)
}

break
}

Expand Down
8 changes: 5 additions & 3 deletions lib/typing/columns/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package columns

import (
"errors"
"fmt"
"slices"
"strings"
"sync"
Expand Down Expand Up @@ -108,9 +109,9 @@ type UpsertColumnArg struct {

// UpsertColumn - just a wrapper around UpdateColumn and AddColumn
// If it doesn't find a column, it'll add one where the kind = Invalid.
func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) {
func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error {
if colName == "" {
return
return fmt.Errorf("column name is empty")
}

if col, isOk := c.GetColumn(colName); isOk {
Expand All @@ -127,7 +128,7 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) {
}

c.UpdateColumn(col)
return
return nil
}

col := Column{
Expand All @@ -148,6 +149,7 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) {
}

c.AddColumn(col)
return nil
}

func (c *Columns) AddColumn(col Column) {
Expand Down
12 changes: 6 additions & 6 deletions lib/typing/columns/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,33 @@ func TestColumns_UpsertColumns(t *testing.T) {

// Now selectively update only a, b
for _, key := range []string{"a", "b"} {
cols.UpsertColumn(key, UpsertColumnArg{
assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{
ToastCol: typing.ToPtr(true),
})
}))

// Now inspect.
col, _ := cols.GetColumn(key)
assert.True(t, col.ToastColumn)
}

cols.UpsertColumn("zzz", UpsertColumnArg{})
assert.NoError(t, cols.UpsertColumn("zzz", UpsertColumnArg{}))
zzzCol, _ := cols.GetColumn("zzz")
assert.False(t, zzzCol.ToastColumn)
assert.False(t, zzzCol.primaryKey)
assert.Equal(t, zzzCol.KindDetails, typing.Invalid)

cols.UpsertColumn("aaa", UpsertColumnArg{
assert.NoError(t, cols.UpsertColumn("aaa", UpsertColumnArg{
ToastCol: typing.ToPtr(true),
PrimaryKey: typing.ToPtr(true),
})
}))
aaaCol, _ := cols.GetColumn("aaa")
assert.True(t, aaaCol.ToastColumn)
assert.True(t, aaaCol.primaryKey)
assert.Equal(t, aaaCol.KindDetails, typing.Invalid)

length := len(cols.columns)
for i := 0; i < 500; i++ {
cols.UpsertColumn("", UpsertColumnArg{})
assert.ErrorContains(t, cols.UpsertColumn("", UpsertColumnArg{}), "column name is empty")
}

assert.Equal(t, length, len(cols.columns))
Expand Down
12 changes: 10 additions & 2 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
// Now iterate over pkMap and tag each column that is a primary key
if cols != nil {
for primaryKey := range pkMap {
cols.UpsertColumn(
err = cols.UpsertColumn(
// We need to escape the column name similar to have parity with event.GetColumns()
columns.EscapeName(primaryKey),
columns.UpsertColumnArg{
PrimaryKey: typing.ToPtr(true),
},
)

if err != nil {
return Event{}, fmt.Errorf("failed to upsert column: %w", err)
}
}
}

Expand Down Expand Up @@ -223,9 +227,13 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali
}

if toastedCol {
inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{
err := inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{
ToastCol: typing.ToPtr(true),
})

if err != nil {
return false, "", fmt.Errorf("failed to upsert column: %w", err)
}
} else {
retrievedColumn, isOk := inMemoryColumns.GetColumn(newColName)
if !isOk {
Expand Down

0 comments on commit 0282f2f

Please sign in to comment.