Skip to content

Commit

Permalink
Merge branch 'master' into nv/kill-column-rawname
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 2, 2024
2 parents 17f5499 + 83b3ddd commit 167b126
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 131 deletions.
2 changes: 1 addition & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
TableID: tableID,
SubQuery: subQuery,
IdempotentKey: tableData.TopicConfig().IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(dwh.Dialect()),
PrimaryKeys: tableData.PrimaryKeys(),
Columns: tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.TopicConfig().SoftDelete,
DestKind: dwh.Label(),
Expand Down
16 changes: 8 additions & 8 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type MergeArgument struct {
TableID types.TableIdentifier
SubQuery string
IdempotentKey string
PrimaryKeys []columns.Wrapper
PrimaryKeys []columns.Column

// AdditionalEqualityStrings is used for handling BigQuery partitioned table merges
AdditionalEqualityStrings []string
Expand Down Expand Up @@ -94,7 +94,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))

Check failure on line 97 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

primaryKey.Name undefined (type columns.Column has no field or method Name)

Check failure on line 97 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

primaryKey.Name undefined (type columns.Column has no field or method Name)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}

Expand All @@ -115,7 +115,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
m.PrimaryKeys[0].EscapedName()),
m.PrimaryKeys[0].Name(m.Dialect)),

Check failure on line 118 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

m.PrimaryKeys[0].Name undefined (type columns.Column has no field or method Name)

Check failure on line 118 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

m.PrimaryKeys[0].Name undefined (type columns.Column has no field or method Name)
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
Expand All @@ -142,7 +142,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {

var pks []string
for _, pk := range m.PrimaryKeys {
pks = append(pks, pk.EscapedName())
pks = append(pks, pk.Name(m.Dialect))

Check failure on line 145 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

pk.Name undefined (type columns.Column has no field or method Name)

Check failure on line 145 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

pk.Name undefined (type columns.Column has no field or method Name)
}

parts := []string{
Expand All @@ -159,7 +159,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
m.PrimaryKeys[0].EscapedName()),
m.PrimaryKeys[0].Name(m.Dialect)),

Check failure on line 162 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

m.PrimaryKeys[0].Name undefined (type columns.Column has no field or method Name)
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
Expand Down Expand Up @@ -207,15 +207,15 @@ func (m *MergeArgument) GetStatement() (string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))

Check failure on line 210 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

primaryKey.Name undefined (type columns.Column has no field or method Name)
pkCol, isOk := m.Columns.GetColumn(primaryKey.RawName())
if !isOk {
return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.RawName(), m.Columns)
}

if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind {
// BigQuery requires special casting to compare two JSON objects.
equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.EscapedName(), primaryKey.EscapedName())
equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))

Check failure on line 218 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

primaryKey.Name undefined (type columns.Column has no field or method Name)
}

equalitySQLParts = append(equalitySQLParts, equalitySQL)
Expand Down Expand Up @@ -295,7 +295,7 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))

Check failure on line 298 in lib/destination/dml/merge.go

View workflow job for this annotation

GitHub Actions / test

primaryKey.Name undefined (type columns.Column has no field or method Name) (compile)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}

Expand Down
4 changes: 2 additions & 2 deletions lib/destination/dml/merge_bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestMergeStatement_TempTable(t *testing.T) {
mergeArg := &MergeArgument{
TableID: MockTableIdentifier{"customers.orders"},
SubQuery: "customers.orders_tmp",
PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_id", typing.Invalid), sql.BigQueryDialect{})},
PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)},
Columns: &cols,
DestKind: constants.BigQuery,
Dialect: sql.BigQueryDialect{},
Expand All @@ -41,7 +41,7 @@ func TestMergeStatement_JSONKey(t *testing.T) {
mergeArg := &MergeArgument{
TableID: MockTableIdentifier{"customers.orders"},
SubQuery: "customers.orders_tmp",
PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_oid", typing.Invalid), sql.BigQueryDialect{})},
PrimaryKeys: []columns.Column{columns.NewColumn("order_oid", typing.Invalid)},
Columns: &cols,
DestKind: constants.BigQuery,
Dialect: sql.BigQueryDialect{},
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/dml/merge_mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Test_GetMSSQLStatement(t *testing.T) {
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "",
PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), sql.MSSQLDialect{})},
PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.MSSQL,
Dialect: sql.MSSQLDialect{},
Expand Down
8 changes: 4 additions & 4 deletions lib/destination/dml/merge_parts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestMergeStatementPartsValidation(t *testing.T) {
}

type result struct {
PrimaryKeys []columns.Wrapper
PrimaryKeys []columns.Column
ColumnsToTypes columns.Columns
}

Expand All @@ -47,11 +47,11 @@ func getBasicColumnsForTest(compositeKey bool) result {
cols.AddColumn(textToastCol)
cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

var pks []columns.Wrapper
pks = append(pks, columns.NewWrapper(idCol, sql.MSSQLDialect{}))
var pks []columns.Column
pks = append(pks, idCol)

if compositeKey {
pks = append(pks, columns.NewWrapper(emailCol, sql.MSSQLDialect{}))
pks = append(pks, emailCol)
}

return result{
Expand Down
33 changes: 14 additions & 19 deletions lib/destination/dml/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,15 @@ func TestMergeStatementSoftDelete(t *testing.T) {
_cols.AddColumn(columns.NewColumn("id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

dialect := sql.SnowflakeDialect{}
for _, idempotentKey := range []string{"", "updated_at"} {
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: idempotentKey,
PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)},
PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.Snowflake,
Dialect: dialect,
Dialect: sql.SnowflakeDialect{},
SoftDelete: true,
}

Expand Down Expand Up @@ -107,15 +106,14 @@ func TestMergeStatement(t *testing.T) {
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "",
PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)},
PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.Snowflake,
Dialect: dialect,
Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}

Expand Down Expand Up @@ -156,15 +154,14 @@ func TestMergeStatementIdempotentKey(t *testing.T) {
_cols.AddColumn(columns.NewColumn("id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "updated_at",
PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)},
PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.Snowflake,
Dialect: dialect,
Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}

Expand Down Expand Up @@ -199,18 +196,17 @@ func TestMergeStatementCompositeKey(t *testing.T) {
_cols.AddColumn(columns.NewColumn("another_id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "updated_at",
PrimaryKeys: []columns.Wrapper{
columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect),
columns.NewWrapper(columns.NewColumn("another_id", typing.Invalid), dialect),
PrimaryKeys: []columns.Column{
columns.NewColumn("id", typing.Invalid),
columns.NewColumn("another_id", typing.Invalid),
},
Columns: &_cols,
DestKind: constants.Snowflake,
Dialect: dialect,
Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}

Expand Down Expand Up @@ -249,18 +245,17 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) {
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "",
PrimaryKeys: []columns.Wrapper{
columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect),
columns.NewWrapper(columns.NewColumn("group", typing.Invalid), dialect),
PrimaryKeys: []columns.Column{
columns.NewColumn("id", typing.Invalid),
columns.NewColumn("group", typing.Invalid),
},
Columns: &_cols,
DestKind: constants.Snowflake,
Dialect: dialect,
Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}

Expand Down
4 changes: 2 additions & 2 deletions lib/destination/dml/merge_valid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
)

func TestMergeArgument_Valid(t *testing.T) {
primaryKeys := []columns.Wrapper{
columns.NewWrapper(columns.NewColumn("id", typing.Integer), sql.SnowflakeDialect{}),
primaryKeys := []columns.Column{
columns.NewColumn("id", typing.Integer),
}

var cols columns.Columns
Expand Down
7 changes: 3 additions & 4 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/size"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -67,10 +66,10 @@ func (t *TableData) ContainOtherOperations() bool {
return t.containOtherOperations
}

func (t *TableData) PrimaryKeys(dialect sql.Dialect) []columns.Wrapper {
var pks []columns.Wrapper
func (t *TableData) PrimaryKeys() []columns.Column {
var pks []columns.Column
for _, pk := range t.primaryKeys {
pks = append(pks, columns.NewWrapper(columns.NewColumn(pk, typing.Invalid), dialect))
pks = append(pks, columns.NewColumn(pk, typing.Invalid))
}

return pks
Expand Down
25 changes: 0 additions & 25 deletions lib/typing/columns/wrapper.go

This file was deleted.

65 changes: 0 additions & 65 deletions lib/typing/columns/wrapper_test.go

This file was deleted.

0 comments on commit 167b126

Please sign in to comment.