Skip to content

Commit

Permalink
Handling all edge cases with TOAST (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 13, 2023
1 parent 7ccb4f0 commit f23f3c8
Show file tree
Hide file tree
Showing 17 changed files with 565 additions and 132 deletions.
12 changes: 6 additions & 6 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/artie-labs/transfer/lib/stringutil"
"strings"
"time"

Expand All @@ -13,14 +12,15 @@ import (
"github.com/artie-labs/transfer/lib/dwh/dml"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)

func merge(tableData *optimization.TableData) (string, error) {
var cols []string
// Given all the columns, diff this against SFLK.
for _, col := range tableData.InMemoryColumns.GetColumns() {
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() {
if col.KindDetails == typing.Invalid {
// Don't update BQ
continue
Expand All @@ -34,7 +34,7 @@ func merge(tableData *optimization.TableData) (string, error) {
for _, value := range tableData.RowsData() {
var colVals []string
for _, col := range cols {
colKind, _ := tableData.InMemoryColumns.GetColumn(col)
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal := value[col]
if colVal != nil {
switch colKind.KindDetails.Kind {
Expand Down Expand Up @@ -99,15 +99,15 @@ func merge(tableData *optimization.TableData) (string, error) {
IdempotentKey: tableData.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: cols,
ColumnsToTypes: *tableData.InMemoryColumns,
ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.SoftDelete,
// BigQuery specifically needs it.
SpecialCastingRequired: true,
})
}

func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
if tableData.Rows() == 0 || tableData.InMemoryColumns == nil {
if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil {
// There's no rows or columns. Let's skip.
return nil
}
Expand All @@ -124,7 +124,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

log := logger.FromContext(ctx)
// Check if all the columns exist in Snowflake
srcKeysMissing, targetKeysMissing := typing.Diff(*tableData.InMemoryColumns, targetColumns, tableData.SoftDelete)
srcKeysMissing, targetKeysMissing := typing.Diff(*tableData.ReadOnlyInMemoryCols(), targetColumns, tableData.SoftDelete)

createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Expand Down
6 changes: 3 additions & 3 deletions clients/snowflake/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {
var sflkCols []string

// Given all the columns, diff this against SFLK.
for _, column := range tableData.InMemoryColumns.GetColumns() {
for _, column := range tableData.ReadOnlyInMemoryCols().GetColumns() {
if column.KindDetails.Kind == typing.Invalid.Kind {
// Don't update Snowflake
continue
Expand All @@ -38,7 +38,7 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {
for _, value := range tableData.RowsData() {
var rowValues []string
for _, col := range cols {
colKind, _ := tableData.InMemoryColumns.GetColumn(col)
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal := value[col]
if colVal != nil {
switch colKind.KindDetails.Kind {
Expand Down Expand Up @@ -87,7 +87,7 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {
IdempotentKey: tableData.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: cols,
ColumnsToTypes: *tableData.InMemoryColumns,
ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.SoftDelete,
})
}
4 changes: 2 additions & 2 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

func (s *Store) merge(ctx context.Context, tableData *optimization.TableData) error {
if tableData.Rows() == 0 || tableData.InMemoryColumns == nil {
if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil {
// There's no rows. Let's skip.
return nil
}
Expand All @@ -68,7 +68,7 @@ func (s *Store) merge(ctx context.Context, tableData *optimization.TableData) er
}

// Check if all the columns exist in Snowflake
srcKeysMissing, targetKeysMissing := typing.Diff(*tableData.InMemoryColumns, targetColumns, tableData.SoftDelete)
srcKeysMissing, targetKeysMissing := typing.Diff(*tableData.ReadOnlyInMemoryCols(), targetColumns, tableData.SoftDelete)

createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Expand Down
11 changes: 7 additions & 4 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
types.NewDwhTableConfig(anotherCols, nil, false, true))

err := s.store.Merge(s.ctx, tableData)
_col, isOk := tableData.InMemoryColumns.GetColumn("first_name")
_col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name")
assert.True(s.T(), isOk)
assert.Equal(s.T(), _col.KindDetails, typing.String)
assert.NoError(s.T(), err)
Expand Down Expand Up @@ -162,7 +162,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {

tableData := optimization.NewTableData(&cols, []string{"id"}, topicConfig)
for pk, row := range rowsData {
tableData.InsertRow(pk ,row)
tableData.InsertRow(pk, row)
}

s.store.configMap.AddTableToConfig(topicConfig.ToFqName(constants.Snowflake),
Expand Down Expand Up @@ -253,13 +253,16 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
// Now try to execute merge where 1 of the rows have the column now
for _, pkMap := range tableData.RowsData() {
pkMap["new"] = "123"
tableData.InMemoryColumns = &sflkCols
tableData.SetInMemoryColumns(&sflkCols)

inMemColumns := tableData.ReadOnlyInMemoryCols()
// Since sflkColumns overwrote the format, let's set it correctly again.
tableData.InMemoryColumns.UpdateColumn(typing.Column{
inMemColumns.UpdateColumn(typing.Column{
Name: "created_at",
KindDetails: typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
})

tableData.SetInMemoryColumns(inMemColumns)
break
}

Expand Down
2 changes: 2 additions & 0 deletions lib/debezium/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

const (
ToastUnavailableValuePlaceholder = "__debezium_unavailable_value"

KeyFormatJSON = "org.apache.kafka.connect.json.JsonConverter"
KeyFormatString = "org.apache.kafka.connect.storage.StringConverter"

Expand Down
40 changes: 32 additions & 8 deletions lib/optimization/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package optimization

import (
"context"
"strings"
"time"

"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/size"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
"strings"
"time"
)

type TableData struct {
InMemoryColumns *typing.Columns // list of columns
inMemoryColumns *typing.Columns // list of columns
rowsData map[string]map[string]interface{} // pk -> { col -> val }
PrimaryKeys []string

Expand All @@ -25,12 +26,35 @@ type TableData struct {

// This is used for the automatic schema detection
LatestCDCTs time.Time
approxSize int
approxSize int
}

func (t *TableData) SetInMemoryColumns(columns *typing.Columns) {
t.inMemoryColumns = columns
return
}

func (t *TableData) AddInMemoryCol(column typing.Column) {
t.inMemoryColumns.AddColumn(column)
return
}

func (t *TableData) ReadOnlyInMemoryCols() *typing.Columns {
if t.inMemoryColumns == nil {
return nil
}

var cols typing.Columns
for _, col := range t.inMemoryColumns.GetColumns() {
cols.AddColumn(col)
}

return &cols
}

func NewTableData(inMemoryColumns *typing.Columns, primaryKeys []string, topicConfig kafkalib.TopicConfig) *TableData {
return &TableData{
InMemoryColumns: inMemoryColumns,
inMemoryColumns: inMemoryColumns,
rowsData: map[string]map[string]interface{}{},
PrimaryKeys: primaryKeys,
TopicConfig: topicConfig,
Expand Down Expand Up @@ -77,7 +101,7 @@ func (t *TableData) Rows() uint {

func (t *TableData) ShouldFlush(ctx context.Context) bool {
settings := config.FromContext(ctx)
return t.Rows() > settings.Config.BufferRows || t.approxSize > settings.Config.FlushSizeKb * 1024
return t.Rows() > settings.Config.BufferRows || t.approxSize > settings.Config.FlushSizeKb*1024
}

// UpdateInMemoryColumnsFromDestination - When running Transfer, we will have 2 column types.
Expand All @@ -91,7 +115,7 @@ func (t *TableData) UpdateInMemoryColumnsFromDestination(cols ...typing.Column)
return
}

for _, inMemoryCol := range t.InMemoryColumns.GetColumns() {
for _, inMemoryCol := range t.inMemoryColumns.GetColumns() {
if inMemoryCol.KindDetails.Kind == typing.Invalid.Kind {
// Don't copy this over because tableData has the wrong colVal
continue
Expand All @@ -118,7 +142,7 @@ func (t *TableData) UpdateInMemoryColumnsFromDestination(cols ...typing.Column)
inMemoryCol.KindDetails.ExtendedTimeDetails.Type = foundColumn.KindDetails.ExtendedTimeDetails.Type
}

t.InMemoryColumns.UpdateColumn(inMemoryCol)
t.inMemoryColumns.UpdateColumn(inMemoryCol)
}
}

Expand Down
64 changes: 44 additions & 20 deletions lib/optimization/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,39 @@ package optimization
import (
"context"
"fmt"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestTableData_ReadOnlyInMemoryCols(t *testing.T) {
// Making sure the columns are actually read only.
var cols typing.Columns
cols.AddColumn(typing.Column{
Name: "name",
KindDetails: typing.String,
})

td := NewTableData(&cols, nil, kafkalib.TopicConfig{})
readOnlyCols := td.ReadOnlyInMemoryCols()
readOnlyCols.AddColumn(typing.Column{
Name: "last_name",
KindDetails: typing.String,
})

// Check if last_name actually exists.
_, isOk := td.ReadOnlyInMemoryCols().GetColumn("last_name")
assert.False(t, isOk)

// Check length is 1.
assert.Equal(t, 1, len(td.ReadOnlyInMemoryCols().GetColumns()))
}

func TestTableData_UpdateInMemoryColumns(t *testing.T) {
var _cols typing.Columns
for colName, colKind := range map[string]typing.KindDetails{
Expand All @@ -27,14 +51,14 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) {
}

tableData := &TableData{
InMemoryColumns: &_cols,
inMemoryColumns: &_cols,
}

extCol, isOk := tableData.InMemoryColumns.GetColumn("do_not_change_format")
extCol, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("do_not_change_format")
assert.True(t, isOk)

extCol.KindDetails.ExtendedTimeDetails.Format = time.RFC3339Nano
tableData.InMemoryColumns.UpdateColumn(typing.Column{
tableData.inMemoryColumns.UpdateColumn(typing.Column{
Name: extCol.Name,
KindDetails: extCol.KindDetails,
})
Expand All @@ -52,21 +76,21 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) {
}

// It's saved back in the original format.
_, isOk = tableData.InMemoryColumns.GetColumn("foo")
_, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("foo")
assert.False(t, isOk)

_, isOk = tableData.InMemoryColumns.GetColumn("FOO")
_, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("FOO")
assert.True(t, isOk)

col, isOk := tableData.InMemoryColumns.GetColumn("CHANGE_me")
col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("CHANGE_me")
assert.True(t, isOk)
assert.Equal(t, ext.DateTime.Type, col.KindDetails.ExtendedTimeDetails.Type)

col, isOk = tableData.InMemoryColumns.GetColumn("bar")
col, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("bar")
assert.True(t, isOk)
assert.Equal(t, typing.Invalid, col.KindDetails)

col, isOk = tableData.InMemoryColumns.GetColumn("do_not_change_format")
col, isOk = tableData.ReadOnlyInMemoryCols().GetColumn("do_not_change_format")
assert.True(t, isOk)
assert.Equal(t, col.KindDetails.Kind, typing.ETime.Kind)
assert.Equal(t, col.KindDetails.ExtendedTimeDetails.Type, ext.DateTimeKindType, "correctly mapped type")
Expand All @@ -76,13 +100,13 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) {
func TestTableData_ShouldFlushRowLength(t *testing.T) {
ctx := context.Background()
ctx = config.InjectSettingsIntoContext(ctx, &config.Settings{Config: &config.Config{
FlushSizeKb: 500,
BufferRows: 2,
FlushSizeKb: 500,
BufferRows: 2,
}})

// Insert 3 rows and confirm that we need to flush.
td := NewTableData(nil, nil, kafkalib.TopicConfig{})
for i := 0; i < 3; i ++ {
for i := 0; i < 3; i++ {
assert.False(t, td.ShouldFlush(ctx))
td.InsertRow(fmt.Sprint(i), map[string]interface{}{
"foo": "bar",
Expand All @@ -95,18 +119,18 @@ func TestTableData_ShouldFlushRowLength(t *testing.T) {
func TestTableData_ShouldFlushRowSize(t *testing.T) {
ctx := context.Background()
ctx = config.InjectSettingsIntoContext(ctx, &config.Settings{Config: &config.Config{
FlushSizeKb: 5,
BufferRows: 20000,
FlushSizeKb: 5,
BufferRows: 20000,
}})

// Insert 3 rows and confirm that we need to flush.
td := NewTableData(nil, nil, kafkalib.TopicConfig{})
for i := 0; i < 45; i ++ {
for i := 0; i < 45; i++ {
assert.False(t, td.ShouldFlush(ctx))
td.InsertRow(fmt.Sprint(i), map[string]interface{}{
"foo": "bar",
"foo": "bar",
"array": []string{"foo", "bar", "dusty", "the aussie", "robin", "jacqueline", "charlie"},
"true": true,
"true": true,
"false": false,
"nested": map[string]interface{}{
"foo": "bar",
Expand All @@ -115,9 +139,9 @@ func TestTableData_ShouldFlushRowSize(t *testing.T) {
}

td.InsertRow("33333", map[string]interface{}{
"foo": "bar",
"foo": "bar",
"array": []string{"foo", "bar", "dusty", "the aussie", "robin", "jacqueline", "charlie"},
"true": true,
"true": true,
"false": false,
"nested": map[string]interface{}{
"foo": "bar",
Expand Down
4 changes: 4 additions & 0 deletions lib/typing/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package typing
type Column struct {
Name string
KindDetails KindDetails
// ToastColumn indicates that the source column is a TOAST column and the value is unavailable
// We have stripped this out.
// Whenever we see the same column where there's an opposite value in `toastColumn`, we will trigger a flush
ToastColumn bool
}

type Columns struct {
Expand Down
Loading

0 comments on commit f23f3c8

Please sign in to comment.