Skip to content

Commit

Permalink
Supporting SMT routing (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 4, 2023
1 parent e55652f commit 36cde56
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 77 deletions.
4 changes: 2 additions & 2 deletions clients/bigquery/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/artie-labs/transfer/lib/typing"
)

func (s *Store) getTableConfig(_ context.Context, tableData *optimization.TableData) (*types.DwhTableConfig, error) {
fqName := tableData.ToFqName(constants.BigQuery)
func (s *Store) getTableConfig(ctx context.Context, tableData *optimization.TableData) (*types.DwhTableConfig, error) {
fqName := tableData.ToFqName(ctx, constants.BigQuery)
tc := s.configMap.TableConfig(fqName)
if tc != nil {
return tc, nil
Expand Down
8 changes: 4 additions & 4 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tableData.ToFqName(s.Label()),
FqTableName: tableData.ToFqName(ctx, s.Label()),
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Expand All @@ -88,7 +88,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
deleteAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tableData.ToFqName(s.Label()),
FqTableName: tableData.ToFqName(ctx, s.Label()),
CreateTable: false,
ColumnOp: constants.Delete,
CdcTime: tableData.LatestCDCTs,
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: fmt.Sprintf("%s_%s", tableData.ToFqName(s.Label()), tableData.TempTableSuffix()),
FqTableName: fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label()), tableData.TempTableSuffix()),
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Expand All @@ -146,7 +146,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

mergeQuery, err := dml.MergeStatement(dml.MergeArgument{
FqTableName: tableData.ToFqName(constants.BigQuery),
FqTableName: tableData.ToFqName(ctx, constants.BigQuery),
SubQuery: tempAlterTableArgs.FqTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Expand Down
5 changes: 3 additions & 2 deletions clients/snowflake/merge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package snowflake

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -47,7 +48,7 @@ func escapeCols(cols []typing.Column) (colsToUpdate []string, colsToUpdateEscape
return
}

func getMergeStatement(tableData *optimization.TableData) (string, error) {
func getMergeStatement(ctx context.Context, tableData *optimization.TableData) (string, error) {
var tableValues []string
colsToUpdate, colsToUpdateEscaped := escapeCols(tableData.ReadOnlyInMemoryCols().GetColumns())
for _, value := range tableData.RowsData() {
Expand Down Expand Up @@ -97,7 +98,7 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {
strings.Join(tableValues, ","), tableData.Name(), strings.Join(colsToUpdate, ","))

return dml.MergeStatement(dml.MergeArgument{
FqTableName: tableData.ToFqName(constants.Snowflake),
FqTableName: tableData.ToFqName(ctx, constants.Snowflake),
SubQuery: subQuery,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Expand Down
10 changes: 5 additions & 5 deletions clients/snowflake/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *SnowflakeTestSuite) TestMergeNoDeleteFlag() {
})

tableData := optimization.NewTableData(&cols, []string{"id"}, kafkalib.TopicConfig{}, "")
_, err := getMergeStatement(tableData)
_, err := getMergeStatement(s.ctx, tableData)
assert.Error(s.T(), err, "getMergeStatement failed")

}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *SnowflakeTestSuite) TestMerge() {
tableData.InsertRow(pk, row)
}

mergeSQL, err := getMergeStatement(tableData)
mergeSQL, err := getMergeStatement(s.ctx, tableData)
assert.NoError(s.T(), err, "getMergeStatement failed")
assert.Contains(s.T(), mergeSQL, "robin")
assert.Contains(s.T(), mergeSQL, "false")
Expand Down Expand Up @@ -231,7 +231,7 @@ func (s *SnowflakeTestSuite) TestMergeWithSingleQuote() {
tableData.InsertRow(pk, row)
}

mergeSQL, err := getMergeStatement(tableData)
mergeSQL, err := getMergeStatement(s.ctx, tableData)
assert.NoError(s.T(), err, "getMergeStatement failed")
assert.Contains(s.T(), mergeSQL, `I can\'t fail`)
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (s *SnowflakeTestSuite) TestMergeJson() {
tableData.InsertRow(pk, row)
}

mergeSQL, err := getMergeStatement(tableData)
mergeSQL, err := getMergeStatement(s.ctx, tableData)
assert.NoError(s.T(), err, "getMergeStatement failed")
assert.Contains(s.T(), mergeSQL, `"label": "2\\" pipe"`)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *SnowflakeTestSuite) TestMergeJSONKey() {
tableData.InsertRow(pk, row)
}

mergeSQL, err := getMergeStatement(tableData)
mergeSQL, err := getMergeStatement(s.ctx, tableData)
assert.NoError(s.T(), err, "merge failed")
// Check if MERGE INTO FQ Table exists.
assert.True(s.T(), strings.Contains(mergeSQL, "MERGE INTO shop.public.customer c"), mergeSQL)
Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *Store) merge(ctx context.Context, tableData *optimization.TableData) er
return nil
}

fqName := tableData.ToFqName(constants.Snowflake)
fqName := tableData.ToFqName(ctx, constants.Snowflake)
tableConfig, err := s.getTableConfig(ctx, fqName, tableData.TopicConfig.DropDeletedColumns)
if err != nil {
return err
Expand Down Expand Up @@ -128,7 +128,7 @@ func (s *Store) merge(ctx context.Context, tableData *optimization.TableData) er
}

tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)
query, err := getMergeStatement(tableData)
query, err := getMergeStatement(ctx, tableData)
if err != nil {
log.WithError(err).Warn("failed to generate the getMergeStatement query")
return err
Expand Down
18 changes: 9 additions & 9 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
})
}

s.store.configMap.AddTableToConfig(tableData.ToFqName(constants.Snowflake),
s.store.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake),
types.NewDwhTableConfig(&anotherCols, nil, false, true))

err := s.store.Merge(s.ctx, tableData)
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {
tableData.InsertRow(pk, row)
}

s.store.configMap.AddTableToConfig(tableData.ToFqName(constants.Snowflake),
s.store.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake),
types.NewDwhTableConfig(&cols, nil, false, true))

s.fakeStore.ExecReturnsOnCall(0, nil, fmt.Errorf("390114: Authentication token has expired. The user must authenticate again."))
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {
tableData.InsertRow(pk, row)
}

s.store.configMap.AddTableToConfig(tableData.ToFqName(constants.Snowflake),
s.store.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake),
types.NewDwhTableConfig(&cols, nil, false, true))
err := s.store.Merge(s.ctx, tableData)
assert.Nil(s.T(), err)
Expand Down Expand Up @@ -237,18 +237,18 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
})

config := types.NewDwhTableConfig(&sflkCols, nil, false, true)
s.store.configMap.AddTableToConfig(tableData.ToFqName(constants.Snowflake), config)
s.store.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake), config)

err := s.store.Merge(s.ctx, tableData)
assert.Nil(s.T(), err)
s.fakeStore.ExecReturns(nil, nil)
assert.Equal(s.T(), s.fakeStore.ExecCallCount(), 1, "called merge")

// Check the temp deletion table now.
assert.Equal(s.T(), len(s.store.configMap.TableConfig(tableData.ToFqName(constants.Snowflake)).ReadOnlyColumnsToDelete()), 1,
s.store.configMap.TableConfig(tableData.ToFqName(constants.Snowflake)).ReadOnlyColumnsToDelete())
assert.Equal(s.T(), len(s.store.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete()), 1,
s.store.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete())

_, isOk := s.store.configMap.TableConfig(tableData.ToFqName(constants.Snowflake)).ReadOnlyColumnsToDelete()["new"]
_, isOk := s.store.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete()["new"]
assert.True(s.T(), isOk)

// Now try to execute merge where 1 of the rows have the column now
Expand All @@ -273,8 +273,8 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
assert.Equal(s.T(), s.fakeStore.ExecCallCount(), 2, "called merge again")

// Caught up now, so columns should be 0.
assert.Equal(s.T(), len(s.store.configMap.TableConfig(tableData.ToFqName(constants.Snowflake)).ReadOnlyColumnsToDelete()), 0,
s.store.configMap.TableConfig(tableData.ToFqName(constants.Snowflake)).ReadOnlyColumnsToDelete())
assert.Equal(s.T(), len(s.store.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete()), 0,
s.store.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete())
}

func (s *SnowflakeTestSuite) TestExecuteMergeExitEarly() {
Expand Down
6 changes: 3 additions & 3 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *Store) mergeWithStages(ctx context.Context, tableData *optimization.Tab
return nil
}

fqName := tableData.ToFqName(constants.Snowflake)
fqName := tableData.ToFqName(ctx, constants.Snowflake)
tableConfig, err := s.getTableConfig(ctx, fqName, tableData.TopicConfig.DropDeletedColumns)
if err != nil {
return err
Expand Down Expand Up @@ -167,14 +167,14 @@ func (s *Store) mergeWithStages(ctx context.Context, tableData *optimization.Tab
}

tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)
temporaryTableName := fmt.Sprintf("%s_%s", tableData.ToFqName(s.Label()), tableData.TempTableSuffix())
temporaryTableName := fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label()), tableData.TempTableSuffix())
if err = s.prepareTempTable(ctx, tableData, tableConfig, temporaryTableName); err != nil {
return err
}

// Prepare merge statement
mergeQuery, err := dml.MergeStatement(dml.MergeArgument{
FqTableName: tableData.ToFqName(constants.Snowflake),
FqTableName: tableData.ToFqName(ctx, constants.Snowflake),
SubQuery: temporaryTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Expand Down
5 changes: 3 additions & 2 deletions lib/artie/message.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package artie

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/segmentio/kafka-go"
"time"
)

type Kind int
Expand Down
5 changes: 5 additions & 0 deletions lib/config/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import "time"
const (
ToastUnavailableValuePlaceholder = "__debezium_unavailable_value"

// DebeziumTopicRoutingKey - https://debezium.io/documentation/reference/stable/transformations/topic-routing.html#by-logical-table-router-key-field-name
// This key is added to ensure no compaction or mutation happens since multiple tables are now going into the same topic and may have overlaping key ids.
// We will strip this out from our partition key parsing.
DebeziumTopicRoutingKey = "__dbz__physicalTableIdentifier"

SnowflakeExpireCommentPrefix = "expires:"
ArtiePrefix = "__artie"
DeleteColumnMarker = ArtiePrefix + "_delete"
Expand Down
7 changes: 6 additions & 1 deletion lib/debezium/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
)

const (
Expand Down Expand Up @@ -56,7 +58,8 @@ func parsePartitionKeyString(key []byte) (map[string]interface{}, error) {

retMap[kvParts[0]] = strings.Join(kvParts[1:], "=")
}

// Skip this key.
delete(retMap, constants.DebeziumTopicRoutingKey)
return retMap, nil
}

Expand Down Expand Up @@ -86,5 +89,7 @@ func parsePartitionKeyStruct(key []byte) (map[string]interface{}, error) {
return nil, fmt.Errorf("key object is malformated")
}

// Skip this key.
delete(pkStruct, constants.DebeziumTopicRoutingKey)
return pkStruct, nil
}
36 changes: 35 additions & 1 deletion lib/debezium/keys_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package debezium

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParsePartitionKeyString(t *testing.T) {
Expand All @@ -19,6 +20,11 @@ func TestParsePartitionKeyString(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, kv["id"], "47")

kv, err = parsePartitionKeyString([]byte("Struct{id=47,__dbz__physicalTableIdentifier=dbserver1.inventory.customers}"))
assert.NoError(t, err)
assert.Equal(t, kv["id"], "47")
assert.Equal(t, 1, len(kv))

kv, err = parsePartitionKeyString([]byte("Struct{uuid=d4a5bc26-9ae6-4dd4-8894-39cbcd2d526c}"))
assert.Nil(t, err)
assert.Equal(t, kv["uuid"], "d4a5bc26-9ae6-4dd4-8894-39cbcd2d526c")
Expand Down Expand Up @@ -111,4 +117,32 @@ func TestParsePartitionKeyStruct(t *testing.T) {
assert.Equal(t, kv["quarter_id"], float64(1))
assert.Equal(t, kv["student_id"], float64(1))
assert.Equal(t, kv["course_id"], "course1")

// Normal key with Debezium change event key (SMT)
smtKey := `{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "__dbz__physicalTableIdentifier"
}],
"optional": false,
"name": "dbserver1.inventory.all_tables.Key"
},
"payload": {
"id": 1001,
"__dbz__physicalTableIdentifier": "dbserver1.inventory.customers"
}
}`

kv, err = parsePartitionKeyStruct([]byte(smtKey))
assert.NoError(t, err)
assert.Equal(t, kv["id"], float64(1001))
assert.Equal(t, 1, len(kv))
}
Loading

0 comments on commit 36cde56

Please sign in to comment.