From 83b3ddd998ef2167401a46fd6530912f33543e24 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Wed, 1 May 2024 21:21:06 -0700 Subject: [PATCH 1/7] [typing] Kill `columns.Wrapper` (#543) --- clients/shared/merge.go | 2 +- lib/destination/dml/merge.go | 16 +++--- lib/destination/dml/merge_bigquery_test.go | 4 +- lib/destination/dml/merge_mssql_test.go | 2 +- lib/destination/dml/merge_parts_test.go | 8 +-- lib/destination/dml/merge_test.go | 33 +++++------ lib/destination/dml/merge_valid_test.go | 4 +- lib/optimization/table_data.go | 7 +-- lib/typing/columns/wrapper.go | 25 --------- lib/typing/columns/wrapper_test.go | 65 ---------------------- 10 files changed, 35 insertions(+), 131 deletions(-) delete mode 100644 lib/typing/columns/wrapper.go delete mode 100644 lib/typing/columns/wrapper_test.go diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 754c3f140..0621d00ef 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -123,7 +123,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg TableID: tableID, SubQuery: subQuery, IdempotentKey: tableData.TopicConfig().IdempotentKey, - PrimaryKeys: tableData.PrimaryKeys(dwh.Dialect()), + PrimaryKeys: tableData.PrimaryKeys(), Columns: tableData.ReadOnlyInMemoryCols(), SoftDelete: tableData.TopicConfig().SoftDelete, DestKind: dwh.Label(), diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 2ca9cb393..aad0d1396 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -17,7 +17,7 @@ type MergeArgument struct { TableID types.TableIdentifier SubQuery string IdempotentKey string - PrimaryKeys []columns.Wrapper + PrimaryKeys []columns.Column // AdditionalEqualityStrings is used for handling BigQuery partitioned table merges AdditionalEqualityStrings []string @@ -94,7 +94,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName()) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) equalitySQLParts = append(equalitySQLParts, equalitySQL) } @@ -115,7 +115,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // LEFT JOIN table on pk(s) m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.PrimaryKeys[0].EscapedName()), + m.PrimaryKeys[0].Name(m.Dialect)), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 @@ -142,7 +142,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { var pks []string for _, pk := range m.PrimaryKeys { - pks = append(pks, pk.EscapedName()) + pks = append(pks, pk.Name(m.Dialect)) } parts := []string{ @@ -159,7 +159,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // LEFT JOIN table on pk(s) m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.PrimaryKeys[0].EscapedName()), + m.PrimaryKeys[0].Name(m.Dialect)), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 @@ -207,7 +207,7 @@ func (m *MergeArgument) GetStatement() (string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName()) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) pkCol, isOk := m.Columns.GetColumn(primaryKey.RawName()) if !isOk { return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.RawName(), m.Columns) @@ -215,7 +215,7 @@ func (m *MergeArgument) GetStatement() (string, error) { if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind { // BigQuery requires special casting to compare two JSON objects. - equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.EscapedName(), primaryKey.EscapedName()) + equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) } equalitySQLParts = append(equalitySQLParts, equalitySQL) @@ -295,7 +295,7 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName()) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) equalitySQLParts = append(equalitySQLParts, equalitySQL) } diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go index 726ffa439..355c8229e 100644 --- a/lib/destination/dml/merge_bigquery_test.go +++ b/lib/destination/dml/merge_bigquery_test.go @@ -19,7 +19,7 @@ func TestMergeStatement_TempTable(t *testing.T) { mergeArg := &MergeArgument{ TableID: MockTableIdentifier{"customers.orders"}, SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_id", typing.Invalid), sql.BigQueryDialect{})}, + PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)}, Columns: &cols, DestKind: constants.BigQuery, Dialect: sql.BigQueryDialect{}, @@ -41,7 +41,7 @@ func TestMergeStatement_JSONKey(t *testing.T) { mergeArg := &MergeArgument{ TableID: MockTableIdentifier{"customers.orders"}, SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_oid", typing.Invalid), sql.BigQueryDialect{})}, + PrimaryKeys: []columns.Column{columns.NewColumn("order_oid", typing.Invalid)}, Columns: &cols, DestKind: constants.BigQuery, Dialect: sql.BigQueryDialect{}, diff --git a/lib/destination/dml/merge_mssql_test.go b/lib/destination/dml/merge_mssql_test.go index 505d333da..15613eb7f 100644 --- a/lib/destination/dml/merge_mssql_test.go +++ b/lib/destination/dml/merge_mssql_test.go @@ -44,7 +44,7 @@ func Test_GetMSSQLStatement(t *testing.T) { TableID: MockTableIdentifier{fqTable}, SubQuery: subQuery, IdempotentKey: "", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), sql.MSSQLDialect{})}, + PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, Columns: &_cols, DestKind: constants.MSSQL, Dialect: sql.MSSQLDialect{}, diff --git a/lib/destination/dml/merge_parts_test.go b/lib/destination/dml/merge_parts_test.go index 1b7038a70..7c7f3ce28 100644 --- a/lib/destination/dml/merge_parts_test.go +++ b/lib/destination/dml/merge_parts_test.go @@ -25,7 +25,7 @@ func TestMergeStatementPartsValidation(t *testing.T) { } type result struct { - PrimaryKeys []columns.Wrapper + PrimaryKeys []columns.Column ColumnsToTypes columns.Columns } @@ -47,11 +47,11 @@ func getBasicColumnsForTest(compositeKey bool) result { cols.AddColumn(textToastCol) cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - var pks []columns.Wrapper - pks = append(pks, columns.NewWrapper(idCol, sql.MSSQLDialect{})) + var pks []columns.Column + pks = append(pks, idCol) if compositeKey { - pks = append(pks, columns.NewWrapper(emailCol, sql.MSSQLDialect{})) + pks = append(pks, emailCol) } return result{ diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 2cd93ae9e..312952ea3 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -56,16 +56,15 @@ func TestMergeStatementSoftDelete(t *testing.T) { _cols.AddColumn(columns.NewColumn("id", typing.String)) _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - dialect := sql.SnowflakeDialect{} for _, idempotentKey := range []string{"", "updated_at"} { mergeArg := MergeArgument{ TableID: MockTableIdentifier{fqTable}, SubQuery: subQuery, IdempotentKey: idempotentKey, - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, + PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, Columns: &_cols, DestKind: constants.Snowflake, - Dialect: dialect, + Dialect: sql.SnowflakeDialect{}, SoftDelete: true, } @@ -107,15 +106,14 @@ func TestMergeStatement(t *testing.T) { subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - dialect := sql.SnowflakeDialect{} mergeArg := MergeArgument{ TableID: MockTableIdentifier{fqTable}, SubQuery: subQuery, IdempotentKey: "", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, + PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, Columns: &_cols, DestKind: constants.Snowflake, - Dialect: dialect, + Dialect: sql.SnowflakeDialect{}, SoftDelete: false, } @@ -156,15 +154,14 @@ func TestMergeStatementIdempotentKey(t *testing.T) { _cols.AddColumn(columns.NewColumn("id", typing.String)) _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - dialect := sql.SnowflakeDialect{} mergeArg := MergeArgument{ TableID: MockTableIdentifier{fqTable}, SubQuery: subQuery, IdempotentKey: "updated_at", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, + PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, Columns: &_cols, DestKind: constants.Snowflake, - Dialect: dialect, + Dialect: sql.SnowflakeDialect{}, SoftDelete: false, } @@ -199,18 +196,17 @@ func TestMergeStatementCompositeKey(t *testing.T) { _cols.AddColumn(columns.NewColumn("another_id", typing.String)) _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - dialect := sql.SnowflakeDialect{} mergeArg := MergeArgument{ TableID: MockTableIdentifier{fqTable}, SubQuery: subQuery, IdempotentKey: "updated_at", - PrimaryKeys: []columns.Wrapper{ - columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect), - columns.NewWrapper(columns.NewColumn("another_id", typing.Invalid), dialect), + PrimaryKeys: []columns.Column{ + columns.NewColumn("id", typing.Invalid), + columns.NewColumn("another_id", typing.Invalid), }, Columns: &_cols, DestKind: constants.Snowflake, - Dialect: dialect, + Dialect: sql.SnowflakeDialect{}, SoftDelete: false, } @@ -249,18 +245,17 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - dialect := sql.SnowflakeDialect{} mergeArg := MergeArgument{ TableID: MockTableIdentifier{fqTable}, SubQuery: subQuery, IdempotentKey: "", - PrimaryKeys: []columns.Wrapper{ - columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect), - columns.NewWrapper(columns.NewColumn("group", typing.Invalid), dialect), + PrimaryKeys: []columns.Column{ + columns.NewColumn("id", typing.Invalid), + columns.NewColumn("group", typing.Invalid), }, Columns: &_cols, DestKind: constants.Snowflake, - Dialect: dialect, + Dialect: sql.SnowflakeDialect{}, SoftDelete: false, } diff --git a/lib/destination/dml/merge_valid_test.go b/lib/destination/dml/merge_valid_test.go index 71101833b..fc7114762 100644 --- a/lib/destination/dml/merge_valid_test.go +++ b/lib/destination/dml/merge_valid_test.go @@ -13,8 +13,8 @@ import ( ) func TestMergeArgument_Valid(t *testing.T) { - primaryKeys := []columns.Wrapper{ - columns.NewWrapper(columns.NewColumn("id", typing.Integer), sql.SnowflakeDialect{}), + primaryKeys := []columns.Column{ + columns.NewColumn("id", typing.Integer), } var cols columns.Columns diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index 39402b00e..42338aa5f 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -10,7 +10,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/size" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -67,10 +66,10 @@ func (t *TableData) ContainOtherOperations() bool { return t.containOtherOperations } -func (t *TableData) PrimaryKeys(dialect sql.Dialect) []columns.Wrapper { - var pks []columns.Wrapper +func (t *TableData) PrimaryKeys() []columns.Column { + var pks []columns.Column for _, pk := range t.primaryKeys { - pks = append(pks, columns.NewWrapper(columns.NewColumn(pk, typing.Invalid), dialect)) + pks = append(pks, columns.NewColumn(pk, typing.Invalid)) } return pks diff --git a/lib/typing/columns/wrapper.go b/lib/typing/columns/wrapper.go deleted file mode 100644 index 2d79d4845..000000000 --- a/lib/typing/columns/wrapper.go +++ /dev/null @@ -1,25 +0,0 @@ -package columns - -import ( - "github.com/artie-labs/transfer/lib/sql" -) - -type Wrapper struct { - name string - escapedName string -} - -func NewWrapper(col Column, dialect sql.Dialect) Wrapper { - return Wrapper{ - name: col.name, - escapedName: col.Name(dialect), - } -} - -func (w Wrapper) EscapedName() string { - return w.escapedName -} - -func (w Wrapper) RawName() string { - return w.name -} diff --git a/lib/typing/columns/wrapper_test.go b/lib/typing/columns/wrapper_test.go deleted file mode 100644 index 95a54a649..000000000 --- a/lib/typing/columns/wrapper_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package columns - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/artie-labs/transfer/lib/sql" - - "github.com/artie-labs/transfer/lib/typing" -) - -func TestWrapper_Complete(t *testing.T) { - type _testCase struct { - name string - expectedRawName string - expectedEscapedName string - expectedEscapedNameBQ string - } - - testCases := []_testCase{ - { - name: "happy", - expectedRawName: "happy", - expectedEscapedName: `"HAPPY"`, - expectedEscapedNameBQ: "`happy`", - }, - { - name: "user_id", - expectedRawName: "user_id", - expectedEscapedName: `"USER_ID"`, - expectedEscapedNameBQ: "`user_id`", - }, - { - name: "group", - expectedRawName: "group", - expectedEscapedName: `"GROUP"`, - expectedEscapedNameBQ: "`group`", - }, - } - - for _, testCase := range testCases { - // Snowflake escape - w := NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.SnowflakeDialect{}) - - assert.Equal(t, testCase.expectedEscapedName, w.EscapedName(), testCase.name) - assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name) - - // BigQuery escape - w = NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.BigQueryDialect{}) - - assert.Equal(t, testCase.expectedEscapedNameBQ, w.EscapedName(), testCase.name) - assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name) - - { - w = NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.SnowflakeDialect{}) - assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name) - } - { - w = NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.BigQueryDialect{}) - assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name) - } - - } -} From 2291477bc22815334c9ce05ef672d077f95d18e8 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Wed, 1 May 2024 23:00:22 -0700 Subject: [PATCH 2/7] [sql] Add `QuoteIdentifiers` function (#545) --- clients/snowflake/snowflake.go | 5 +---- lib/sql/util.go | 8 ++++++++ lib/sql/util_test.go | 5 +++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index 8c62c08be..3cf716612 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -128,10 +128,7 @@ func (s *Store) reestablishConnection() error { } func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { - var primaryKeysEscaped []string - for _, pk := range primaryKeys { - primaryKeysEscaped = append(primaryKeysEscaped, s.Dialect().QuoteIdentifier(pk)) - } + primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, s.Dialect()) orderColsToIterate := primaryKeysEscaped if topicConfig.IncludeArtieUpdatedAt { diff --git a/lib/sql/util.go b/lib/sql/util.go index 9a8150f89..b7dab812e 100644 --- a/lib/sql/util.go +++ b/lib/sql/util.go @@ -13,3 +13,11 @@ import ( func QuoteLiteral(value string) string { return fmt.Sprintf("'%s'", strings.ReplaceAll(stringutil.EscapeBackslashes(value), "'", `\'`)) } + +func QuoteIdentifiers(identifiers []string, dialect Dialect) []string { + result := make([]string, len(identifiers)) + for i, identifier := range identifiers { + result[i] = dialect.QuoteIdentifier(identifier) + } + return result +} diff --git a/lib/sql/util_test.go b/lib/sql/util_test.go index 89ea11320..1be71dfb9 100644 --- a/lib/sql/util_test.go +++ b/lib/sql/util_test.go @@ -38,3 +38,8 @@ func TestQuoteLiteral(t *testing.T) { assert.Equal(t, testCase.expected, QuoteLiteral(testCase.colVal), testCase.name) } } + +func TestQuoteIdentifiers(t *testing.T) { + assert.Equal(t, []string{}, QuoteIdentifiers([]string{}, BigQueryDialect{})) + assert.Equal(t, []string{"`a`", "`b`", "`c`"}, QuoteIdentifiers([]string{"a", "b", "c"}, BigQueryDialect{})) +} From 523f5bda9d5efbb71446a0dcb7942c1c4d451004 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 2 May 2024 09:55:22 -0700 Subject: [PATCH 3/7] [typing] Kill `Columns.GetEscapedColumnsToUpdate` (#544) --- clients/snowflake/staging.go | 3 +- lib/destination/dml/merge.go | 64 ++++++++++++------------------ lib/destination/dml/merge_test.go | 33 +++++++++++++++ lib/typing/columns/columns.go | 22 ---------- lib/typing/columns/columns_test.go | 58 --------------------------- 5 files changed, 60 insertions(+), 120 deletions(-) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 2528dfae4..bac39ef83 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -12,6 +12,7 @@ import ( "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" + "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/values" @@ -83,7 +84,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo // COPY the CSV file (in Snowflake) into a table copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)", tempTableID.FullyQualifiedName(), - strings.Join(tableData.ReadOnlyInMemoryCols().GetEscapedColumnsToUpdate(s.Dialect()), ","), + strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","), escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%")) if additionalSettings.AdditionalCopyClause != "" { diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index aad0d1396..7f1cf4649 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -3,6 +3,7 @@ package dml import ( "errors" "fmt" + "slices" "strings" "github.com/artie-labs/transfer/lib/array" @@ -65,6 +66,12 @@ func (m *MergeArgument) Valid() error { return nil } +func removeDeleteColumnMarker(columns []string) ([]string, bool) { + origLength := len(columns) + columns = slices.DeleteFunc(columns, func(col string) bool { return col == constants.DeleteColumnMarker }) + return columns, len(columns) != origLength +} + func (m *MergeArgument) GetParts() ([]string, error) { if err := m.Valid(); err != nil { return nil, err @@ -98,17 +105,17 @@ func (m *MergeArgument) GetParts() ([]string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } - cols := m.Columns.GetEscapedColumnsToUpdate(m.Dialect) + columns := m.Columns.GetColumnsToUpdate() if m.SoftDelete { return []string{ // INSERT fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) - m.TableID.FullyQualifiedName(), strings.Join(cols, ","), + m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), // SELECT cc.col1, cc.col2, ... FROM staging as CC array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: cols, + Vals: sql.QuoteIdentifiers(columns, m.Dialect), Separator: ",", Prefix: "cc.", }), m.SubQuery, @@ -128,14 +135,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // We also need to remove __artie flags since it does not exist in the destination table var removed bool - for idx, col := range cols { - if col == m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker) { - cols = append(cols[:idx], cols[idx+1:]...) - removed = true - break - } - } - + columns, removed = removeDeleteColumnMarker(columns) if !removed { return nil, errors.New("artie delete flag doesn't exist") } @@ -149,10 +149,10 @@ func (m *MergeArgument) GetParts() ([]string, error) { // INSERT fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) - m.TableID.FullyQualifiedName(), strings.Join(cols, ","), + m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), // SELECT cc.col1, cc.col2, ... FROM staging as CC array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: cols, + Vals: sql.QuoteIdentifiers(columns, m.Dialect), Separator: ",", Prefix: "cc.", }), m.SubQuery, @@ -230,7 +230,7 @@ func (m *MergeArgument) GetStatement() (string, error) { equalitySQLParts = append(equalitySQLParts, m.AdditionalEqualityStrings...) } - cols := m.Columns.GetEscapedColumnsToUpdate(m.Dialect) + columns := m.Columns.GetColumnsToUpdate() if m.SoftDelete { return fmt.Sprintf(` @@ -241,9 +241,9 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Update + Soft Deletion idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), // Insert - constants.DeleteColumnMarker, strings.Join(cols, ","), + constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: cols, + Vals: sql.QuoteIdentifiers(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil @@ -251,14 +251,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // We also need to remove __artie flags since it does not exist in the destination table var removed bool - for idx, col := range cols { - if col == m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker) { - cols = append(cols[:idx], cols[idx+1:]...) - removed = true - break - } - } - + columns, removed = removeDeleteColumnMarker(columns) if !removed { return "", errors.New("artie delete flag doesn't exist") } @@ -274,9 +267,9 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);` // Update constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), // Insert - constants.DeleteColumnMarker, strings.Join(cols, ","), + constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: cols, + Vals: sql.QuoteIdentifiers(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil @@ -299,7 +292,7 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } - cols := m.Columns.GetEscapedColumnsToUpdate(m.Dialect) + columns := m.Columns.GetColumnsToUpdate() if m.SoftDelete { return fmt.Sprintf(` @@ -311,9 +304,9 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, // Update + Soft Deletion idempotentClause, m.Columns.UpdateQuery(m.Dialect, false), // Insert - constants.DeleteColumnMarker, strings.Join(cols, ","), + constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: cols, + Vals: sql.QuoteIdentifiers(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil @@ -321,14 +314,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`, // We also need to remove __artie flags since it does not exist in the destination table var removed bool - for idx, col := range cols { - if col == m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker) { - cols = append(cols[:idx], cols[idx+1:]...) - removed = true - break - } - } - + columns, removed = removeDeleteColumnMarker(columns) if !removed { return "", errors.New("artie delete flag doesn't exist") } @@ -345,9 +331,9 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`, // Update constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true), // Insert - constants.DeleteColumnMarker, strings.Join(cols, ","), + constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: cols, + Vals: sql.QuoteIdentifiers(columns, m.Dialect), Separator: ",", Prefix: "cc.", })), nil diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 312952ea3..0f0081bf3 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -32,6 +32,39 @@ func (m MockTableIdentifier) FullyQualifiedName() string { return m.fqName } +func TestRemoveDeleteColumnMarker(t *testing.T) { + { + columns, removed := removeDeleteColumnMarker([]string{}) + assert.Empty(t, columns) + assert.False(t, removed) + } + { + columns, removed := removeDeleteColumnMarker([]string{"a"}) + assert.Equal(t, []string{"a"}, columns) + assert.False(t, removed) + } + { + columns, removed := removeDeleteColumnMarker([]string{"a", "b"}) + assert.Equal(t, []string{"a", "b"}, columns) + assert.False(t, removed) + } + { + columns, removed := removeDeleteColumnMarker([]string{constants.DeleteColumnMarker}) + assert.True(t, removed) + assert.Empty(t, columns) + } + { + columns, removed := removeDeleteColumnMarker([]string{"a", constants.DeleteColumnMarker, "b"}) + assert.True(t, removed) + assert.Equal(t, []string{"a", "b"}, columns) + } + { + columns, removed := removeDeleteColumnMarker([]string{"a", constants.DeleteColumnMarker, "b", constants.DeleteColumnMarker, "c"}) + assert.True(t, removed) + assert.Equal(t, []string{"a", "b", "c"}, columns) + } +} + func TestMergeStatementSoftDelete(t *testing.T) { // No idempotent key fqTable := "database.schema.table" diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index a6b752a5c..ecbbd1aaf 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -194,28 +194,6 @@ func (c *Columns) GetColumnsToUpdate() []string { return cols } -// GetEscapedColumnsToUpdate will filter all the `Invalid` columns so that we do not update it. -// It will escape the returned columns. -func (c *Columns) GetEscapedColumnsToUpdate(dialect sql.Dialect) []string { - if c == nil { - return []string{} - } - - c.RLock() - defer c.RUnlock() - - var cols []string - for _, col := range c.columns { - if col.KindDetails == typing.Invalid { - continue - } - - cols = append(cols, col.Name(dialect)) - } - - return cols -} - func (c *Columns) GetColumns() []Column { if c == nil { return []Column{} diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 80e392b47..3e711143b 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -229,64 +229,6 @@ func TestColumns_GetColumnsToUpdate(t *testing.T) { } } -func TestColumns_GetEscapedColumnsToUpdate(t *testing.T) { - type _testCase struct { - name string - cols []Column - expectedColsEsc []string - expectedColsEscBq []string - } - - var ( - happyPathCols = []Column{ - { - name: "hi", - KindDetails: typing.String, - }, - { - name: "bye", - KindDetails: typing.String, - }, - { - name: "start", - KindDetails: typing.String, - }, - } - ) - - extraCols := happyPathCols - for i := 0; i < 100; i++ { - extraCols = append(extraCols, Column{ - name: fmt.Sprintf("hello_%v", i), - KindDetails: typing.Invalid, - }) - } - - testCases := []_testCase{ - { - name: "happy path", - cols: happyPathCols, - expectedColsEsc: []string{`"HI"`, `"BYE"`, `"START"`}, - expectedColsEscBq: []string{"`hi`", "`bye`", "`start`"}, - }, - { - name: "happy path + extra col", - cols: extraCols, - expectedColsEsc: []string{`"HI"`, `"BYE"`, `"START"`}, - expectedColsEscBq: []string{"`hi`", "`bye`", "`start`"}, - }, - } - - for _, testCase := range testCases { - cols := &Columns{ - columns: testCase.cols, - } - - assert.Equal(t, testCase.expectedColsEsc, cols.GetEscapedColumnsToUpdate(sql.SnowflakeDialect{}), testCase.name) - assert.Equal(t, testCase.expectedColsEscBq, cols.GetEscapedColumnsToUpdate(sql.BigQueryDialect{}), testCase.name) - } -} - func TestColumns_UpsertColumns(t *testing.T) { keys := []string{"a", "b", "c", "d", "e"} var cols Columns From 2d8ecb3b37742370688ebf0a3d23319afe4896dd Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 2 May 2024 10:55:10 -0700 Subject: [PATCH 4/7] [typing] Rename `Column.Name` (#546) Signed-off-by: Nathan <148575555+nathan-artie@users.noreply.github.com> --- clients/shared/utils.go | 2 +- lib/destination/ddl/ddl.go | 4 ++-- lib/destination/ddl/ddl_bq_test.go | 6 +++--- lib/destination/ddl/ddl_sflk_test.go | 4 ++-- lib/destination/dml/merge.go | 14 +++++++------- lib/typing/columns/columns.go | 5 ++--- lib/typing/columns/columns_test.go | 4 ++-- 7 files changed, 19 insertions(+), 20 deletions(-) diff --git a/clients/shared/utils.go b/clients/shared/utils.go index 664fcf7ff..606f87ce1 100644 --- a/clients/shared/utils.go +++ b/clients/shared/utils.go @@ -30,7 +30,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col return fmt.Errorf("failed to escape default value: %w", err) } - escapedCol := column.Name(dwh.Dialect()) + escapedCol := column.EscapedName(dwh.Dialect()) // TODO: This is added because `default` is not technically a column that requires escaping, but it is required when it's in the where clause. // Once we escape everything by default, we can remove this patch of code. diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 72c193811..886352527 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -99,7 +99,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error { mutateCol = append(mutateCol, col) switch a.ColumnOp { case constants.Add: - colName := col.Name(a.Dwh.Dialect()) + colName := col.EscapedName(a.Dwh.Dialect()) if col.PrimaryKey() && a.Mode != config.History { // Don't create a PK for history mode because it's append-only, so the primary key should not be enforced. @@ -108,7 +108,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error { colSQLParts = append(colSQLParts, fmt.Sprintf(`%s %s`, colName, typing.KindToDWHType(col.KindDetails, a.Dwh.Label(), col.PrimaryKey()))) case constants.Delete: - colSQLParts = append(colSQLParts, col.Name(a.Dwh.Dialect())) + colSQLParts = append(colSQLParts, col.EscapedName(a.Dwh.Dialect())) } } diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index e756d0a1f..fe4a47448 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -86,7 +86,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { assert.NoError(d.T(), alterTableArgs.AlterTable(column)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, column.Name(d.bigQueryStore.Dialect())), query) + assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, column.EscapedName(d.bigQueryStore.Dialect())), query) callIdx += 1 } @@ -143,7 +143,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { assert.NoError(d.T(), alterTableArgs.AlterTable(col)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, col.Name(d.bigQueryStore.Dialect()), + assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, col.EscapedName(d.bigQueryStore.Dialect()), typing.KindToDWHType(kind, d.bigQueryStore.Label(), false)), query) callIdx += 1 } @@ -202,7 +202,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { assert.NoError(d.T(), alterTableArgs.AlterTable(column)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, column.Name(d.bigQueryStore.Dialect()), + assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, column.EscapedName(d.bigQueryStore.Dialect()), typing.KindToDWHType(column.KindDetails, d.bigQueryStore.Label(), false)), query) callIdx += 1 } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 80c2b5ded..65f9926b2 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -42,7 +42,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { for i := 0; i < len(cols); i++ { execQuery, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`, - cols[i].Name(d.snowflakeStagesStore.Dialect()), + cols[i].EscapedName(d.snowflakeStagesStore.Dialect()), typing.KindToDWHType(cols[i].KindDetails, d.snowflakeStagesStore.Label(), false)), execQuery) } @@ -172,7 +172,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { execArg, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) assert.Equal(d.T(), execArg, fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", `shop.public."USERS"`, constants.Delete, - cols[i].Name(d.snowflakeStagesStore.Dialect()))) + cols[i].EscapedName(d.snowflakeStagesStore.Dialect()))) } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 7f1cf4649..c75e5579e 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -101,7 +101,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) equalitySQLParts = append(equalitySQLParts, equalitySQL) } @@ -122,7 +122,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // LEFT JOIN table on pk(s) m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.PrimaryKeys[0].Name(m.Dialect)), + m.PrimaryKeys[0].EscapedName(m.Dialect)), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 @@ -142,7 +142,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { var pks []string for _, pk := range m.PrimaryKeys { - pks = append(pks, pk.Name(m.Dialect)) + pks = append(pks, pk.EscapedName(m.Dialect)) } parts := []string{ @@ -159,7 +159,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { // LEFT JOIN table on pk(s) m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.PrimaryKeys[0].Name(m.Dialect)), + m.PrimaryKeys[0].EscapedName(m.Dialect)), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 @@ -207,7 +207,7 @@ func (m *MergeArgument) GetStatement() (string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) pkCol, isOk := m.Columns.GetColumn(primaryKey.RawName()) if !isOk { return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.RawName(), m.Columns) @@ -215,7 +215,7 @@ func (m *MergeArgument) GetStatement() (string, error) { if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind { // BigQuery requires special casting to compare two JSON objects. - equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) + equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) } equalitySQLParts = append(equalitySQLParts, equalitySQL) @@ -288,7 +288,7 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect)) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) equalitySQLParts = append(equalitySQLParts, equalitySQL) } diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index ecbbd1aaf..23eab0705 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -83,8 +83,7 @@ func (c *Column) RawName() string { return c.name } -// Name will give you c.name and escape it if necessary. -func (c *Column) Name(dialect sql.Dialect) string { +func (c *Column) EscapedName(dialect sql.Dialect) string { return dialect.QuoteIdentifier(c.name) } @@ -245,7 +244,7 @@ func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string { continue } - colName := column.Name(dialect) + colName := column.EscapedName(dialect) if column.ToastColumn { if column.KindDetails == typing.Struct { cols = append(cols, processToastStructCol(colName, dialect)) diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 3e711143b..4610977c5 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -170,8 +170,8 @@ func TestColumn_Name(t *testing.T) { assert.Equal(t, testCase.expectedName, col.RawName(), testCase.colName) - assert.Equal(t, testCase.expectedNameEsc, col.Name(sql.SnowflakeDialect{}), testCase.colName) - assert.Equal(t, testCase.expectedNameEscBq, col.Name(sql.BigQueryDialect{}), testCase.colName) + assert.Equal(t, testCase.expectedNameEsc, col.EscapedName(sql.SnowflakeDialect{}), testCase.colName) + assert.Equal(t, testCase.expectedNameEscBq, col.EscapedName(sql.BigQueryDialect{}), testCase.colName) } } From 11a602e590b70edd42c89a89d5246c680f0ac067 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 2 May 2024 10:59:26 -0700 Subject: [PATCH 5/7] [typing] Rename `Column.RawName` to `Column.Name` (#547) --- clients/shared/merge.go | 4 ++-- clients/shared/utils.go | 2 +- clients/snowflake/ddl_test.go | 10 +++++----- lib/cdc/mysql/debezium_test.go | 4 ++-- lib/cdc/util/relational_event_test.go | 4 ++-- lib/destination/ddl/ddl.go | 2 +- lib/destination/ddl/ddl_bq_test.go | 6 +++--- lib/destination/ddl/ddl_sflk_test.go | 12 ++++++------ lib/destination/dml/merge.go | 4 ++-- lib/destination/types/table_config.go | 8 ++++---- lib/optimization/event_update_test.go | 8 ++++---- lib/optimization/table_data.go | 4 ++-- lib/optimization/table_data_test.go | 2 +- lib/parquetutil/generate_schema.go | 2 +- lib/typing/columns/columns.go | 6 +++--- lib/typing/columns/columns_test.go | 2 +- lib/typing/columns/diff.go | 10 +++++----- lib/typing/columns/diff_test.go | 2 +- models/event/event_save_test.go | 10 +++++----- 19 files changed, 51 insertions(+), 51 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 0621d00ef..7b3f06927 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -93,7 +93,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg for attempts := 0; attempts < backfillMaxRetries; attempts++ { backfillErr = BackfillColumn(cfg, dwh, col, tableID) if backfillErr == nil { - tableConfig.Columns().UpsertColumn(col.RawName(), columns.UpsertColumnArg{ + tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{ Backfilled: ptr.ToBool(true), }) break @@ -110,7 +110,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg } if backfillErr != nil { - return fmt.Errorf("failed to backfill col: %s, default value: %v, err: %w", col.RawName(), col.RawDefaultValue(), backfillErr) + return fmt.Errorf("failed to backfill col: %s, default value: %v, err: %w", col.Name(), col.RawDefaultValue(), backfillErr) } } diff --git a/clients/shared/utils.go b/clients/shared/utils.go index 606f87ce1..64f9faa82 100644 --- a/clients/shared/utils.go +++ b/clients/shared/utils.go @@ -45,7 +45,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col tableID.FullyQualifiedName(), escapedCol, defaultVal, additionalEscapedCol, ) slog.Info("Backfilling column", - slog.String("colName", column.RawName()), + slog.String("colName", column.Name()), slog.String("query", query), slog.String("table", tableID.FullyQualifiedName()), ) diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index c2e8a3e77..61c607576 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -41,7 +41,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { nameCol := columns.NewColumn("name", typing.String) tc := s.stageStore.configMap.TableConfig(tableID) - val := tc.ShouldDeleteColumn(nameCol.RawName(), time.Now().Add(-1*6*time.Hour), true) + val := tc.ShouldDeleteColumn(nameCol.Name(), time.Now().Add(-1*6*time.Hour), true) assert.False(s.T(), val, "should not try to delete this column") assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableID).ReadOnlyColumnsToDelete()), 1) @@ -68,23 +68,23 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { nameCol := columns.NewColumn("name", typing.String) // Let's try to delete name. - allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), + allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(), time.Now().Add(-1*(6*time.Hour)), true) assert.Equal(s.T(), allowed, false, "should not be allowed to delete") // Process tried to delete, but it's lagged. - allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), + allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(), time.Now().Add(-1*(6*time.Hour)), true) assert.Equal(s.T(), allowed, false, "should not be allowed to delete") // Process now caught up, and is asking if we can delete, should still be no. - allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), time.Now(), true) + allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(), time.Now(), true) assert.Equal(s.T(), allowed, false, "should not be allowed to delete still") // Process is finally ahead, has permission to delete now. - allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), + allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(), time.Now().Add(2*constants.DeletionConfidencePadding), true) assert.Equal(s.T(), allowed, true, "should now be allowed to delete") diff --git a/lib/cdc/mysql/debezium_test.go b/lib/cdc/mysql/debezium_test.go index 783ab83c8..49f6fb760 100644 --- a/lib/cdc/mysql/debezium_test.go +++ b/lib/cdc/mysql/debezium_test.go @@ -353,7 +353,7 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() { col, isOk := cols.GetColumn("abcdef") assert.True(m.T(), isOk) - assert.Equal(m.T(), "abcdef", col.RawName()) + assert.Equal(m.T(), "abcdef", col.Name()) for key := range evtData { if strings.Contains(key, constants.ArtiePrefix) { continue @@ -361,6 +361,6 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() { col, isOk = cols.GetColumn(strings.ToLower(key)) assert.Equal(m.T(), true, isOk, key) - assert.Equal(m.T(), typing.Invalid, col.KindDetails, fmt.Sprintf("colName: %v, evtData key: %v", col.RawName(), key)) + assert.Equal(m.T(), typing.Invalid, col.KindDetails, fmt.Sprintf("colName: %v, evtData key: %v", col.Name(), key)) } } diff --git a/lib/cdc/util/relational_event_test.go b/lib/cdc/util/relational_event_test.go index e5a421ed7..72d903864 100644 --- a/lib/cdc/util/relational_event_test.go +++ b/lib/cdc/util/relational_event_test.go @@ -74,8 +74,8 @@ func TestSource_GetOptionalSchema(t *testing.T) { for _, _col := range cols.GetColumns() { // All the other columns do not have a default value. - if _col.RawName() != "boolean_column" { - assert.Nil(t, _col.RawDefaultValue(), _col.RawName()) + if _col.Name() != "boolean_column" { + assert.Nil(t, _col.RawDefaultValue(), _col.Name()) } } } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 886352527..c8f0b864b 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -91,7 +91,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error { } if a.ColumnOp == constants.Delete { - if !a.Tc.ShouldDeleteColumn(col.RawName(), a.CdcTime, a.ContainOtherOperations) { + if !a.Tc.ShouldDeleteColumn(col.Name(), a.CdcTime, a.ContainOtherOperations) { continue } } diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index fe4a47448..123fcc86c 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -152,10 +152,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns()) // Check by iterating over the columns for _, column := range d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns() { - existingCol, isOk := existingCols.GetColumn(column.RawName()) + existingCol, isOk := existingCols.GetColumn(column.Name()) if !isOk { // Check new cols? - existingCol.KindDetails, isOk = newCols[column.RawName()] + existingCol.KindDetails, isOk = newCols[column.Name()] } assert.True(d.T(), isOk) @@ -211,7 +211,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { assert.Equal(d.T(), existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns()) // Check by iterating over the columns for _, column := range d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns() { - existingCol, isOk := existingCols.GetColumn(column.RawName()) + existingCol, isOk := existingCols.GetColumn(column.Name()) assert.True(d.T(), isOk) assert.Equal(d.T(), column.KindDetails, existingCol.KindDetails) } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 65f9926b2..352efba2a 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -108,15 +108,15 @@ func (d *DDLTestSuite) TestAlterTableAdd() { for _, column := range tableConfig.Columns().GetColumns() { var found bool for _, expCol := range cols { - if found = column.RawName() == expCol.RawName(); found { - assert.Equal(d.T(), column.KindDetails, expCol.KindDetails, fmt.Sprintf("wrong col kind, col: %s", column.RawName())) + if found = column.Name() == expCol.Name(); found { + assert.Equal(d.T(), column.KindDetails, expCol.KindDetails, fmt.Sprintf("wrong col kind, col: %s", column.Name())) break } } assert.True(d.T(), found, fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v", - column.RawName(), tableConfig.Columns(), cols)) + column.Name(), tableConfig.Columns(), cols)) } } @@ -150,7 +150,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { for col := range tableConfig.ReadOnlyColumnsToDelete() { var found bool for _, expCol := range cols { - if found = col == expCol.RawName(); found { + if found = col == expCol.Name(); found { break } } @@ -161,7 +161,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { } for i := 0; i < len(cols); i++ { - colToActuallyDelete := cols[i].RawName() + colToActuallyDelete := cols[i].Name() // Now let's check the timestamp assert.True(d.T(), tableConfig.ReadOnlyColumnsToDelete()[colToActuallyDelete].After(time.Now())) // Now let's actually try to dial the time back, and it should actually try to delete. @@ -214,7 +214,7 @@ func (d *DDLTestSuite) TestAlterTableDelete() { for col := range tableConfig.ReadOnlyColumnsToDelete() { var found bool for _, expCol := range cols { - if found = col == expCol.RawName(); found { + if found = col == expCol.Name(); found { break } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index c75e5579e..e40f1d890 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -208,9 +208,9 @@ func (m *MergeArgument) GetStatement() (string, error) { for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) - pkCol, isOk := m.Columns.GetColumn(primaryKey.RawName()) + pkCol, isOk := m.Columns.GetColumn(primaryKey.Name()) if !isOk { - return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.RawName(), m.Columns) + return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.Name(), m.Columns) } if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind { diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index 755f5b66c..79d6515a0 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -64,15 +64,15 @@ func (d *DwhTableConfig) MutateInMemoryColumns(createTable bool, columnOp consta for _, col := range cols { d.columns.AddColumn(col) // Delete from the permissions table, if exists. - delete(d.columnsToDelete, col.RawName()) + delete(d.columnsToDelete, col.Name()) } d.createTable = createTable case constants.Delete: for _, col := range cols { // Delete from the permissions and in-memory table - d.columns.DeleteColumn(col.RawName()) - delete(d.columnsToDelete, col.RawName()) + d.columns.DeleteColumn(col.Name()) + delete(d.columnsToDelete, col.Name()) } } } @@ -91,7 +91,7 @@ func (d *DwhTableConfig) AuditColumnsToDelete(colsToDelete []columns.Column) { for colName := range d.columnsToDelete { var found bool for _, col := range colsToDelete { - if found = col.RawName() == colName; found { + if found = col.Name() == colName; found { break } } diff --git a/lib/optimization/event_update_test.go b/lib/optimization/event_update_test.go index 2d5fa5e78..a61b04e2e 100644 --- a/lib/optimization/event_update_test.go +++ b/lib/optimization/event_update_test.go @@ -74,16 +74,16 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { // Testing backfill for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() { - assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) + assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.Name()) } backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean) backfilledCol.SetBackfilled(true) assert.NoError(t, tableData.MergeColumnsFromDestination(backfilledCol)) for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() { - if inMemoryCol.RawName() == backfilledCol.RawName() { - assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) + if inMemoryCol.Name() == backfilledCol.Name() { + assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.Name()) } else { - assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) + assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.Name()) } } diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index 42338aa5f..09017077e 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -257,9 +257,9 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro var foundColumn columns.Column var found bool for _, destCol := range destCols { - if destCol.RawName() == strings.ToLower(inMemoryCol.RawName()) { + if destCol.Name() == strings.ToLower(inMemoryCol.Name()) { if destCol.KindDetails.Kind == typing.Invalid.Kind { - return fmt.Errorf("column %q is invalid", destCol.RawName()) + return fmt.Errorf("column %q is invalid", destCol.Name()) } foundColumn = destCol diff --git a/lib/optimization/table_data_test.go b/lib/optimization/table_data_test.go index 74cea7f52..5a4e8f449 100644 --- a/lib/optimization/table_data_test.go +++ b/lib/optimization/table_data_test.go @@ -145,7 +145,7 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) { assert.True(t, isOk) extCol.KindDetails.ExtendedTimeDetails.Format = time.RFC3339Nano - tableData.inMemoryColumns.UpdateColumn(columns.NewColumn(extCol.RawName(), extCol.KindDetails)) + tableData.inMemoryColumns.UpdateColumn(columns.NewColumn(extCol.Name(), extCol.KindDetails)) for name, colKindDetails := range map[string]typing.KindDetails{ "foo": typing.String, diff --git a/lib/parquetutil/generate_schema.go b/lib/parquetutil/generate_schema.go index dfbbf1490..8f02657d2 100644 --- a/lib/parquetutil/generate_schema.go +++ b/lib/parquetutil/generate_schema.go @@ -12,7 +12,7 @@ func GenerateJSONSchema(columns []columns.Column) (string, error) { var fields []typing.Field for _, column := range columns { // We don't need to escape the column name here. - field, err := column.KindDetails.ParquetAnnotation(column.RawName()) + field, err := column.KindDetails.ParquetAnnotation(column.Name()) if err != nil { return "", err } diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 23eab0705..c7c71053a 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -79,7 +79,7 @@ func (c *Column) ShouldBackfill() bool { return c.defaultValue != nil && !c.backfilled } -func (c *Column) RawName() string { +func (c *Column) Name() string { return c.name } @@ -187,7 +187,7 @@ func (c *Columns) GetColumnsToUpdate() []string { continue } - cols = append(cols, col.RawName()) + cols = append(cols, col.Name()) } return cols @@ -240,7 +240,7 @@ func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string { } // skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete. - if skipDeleteCol && column.RawName() == constants.DeleteColumnMarker { + if skipDeleteCol && column.Name() == constants.DeleteColumnMarker { continue } diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 4610977c5..525ee12e3 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -168,7 +168,7 @@ func TestColumn_Name(t *testing.T) { name: testCase.colName, } - assert.Equal(t, testCase.expectedName, col.RawName(), testCase.colName) + assert.Equal(t, testCase.expectedName, col.Name(), testCase.colName) assert.Equal(t, testCase.expectedNameEsc, col.EscapedName(sql.SnowflakeDialect{}), testCase.colName) assert.Equal(t, testCase.expectedNameEscBq, col.EscapedName(sql.BigQueryDialect{}), testCase.colName) diff --git a/lib/typing/columns/diff.go b/lib/typing/columns/diff.go index ac22c27d5..f717bca1f 100644 --- a/lib/typing/columns/diff.go +++ b/lib/typing/columns/diff.go @@ -40,7 +40,7 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo targ := CloneColumns(columnsInDestination) var colsToDelete []Column for _, col := range src.GetColumns() { - _, isOk := targ.GetColumn(col.RawName()) + _, isOk := targ.GetColumn(col.Name()) if isOk { colsToDelete = append(colsToDelete, col) @@ -49,13 +49,13 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo // We cannot delete inside a for-loop that is iterating over src.GetColumns() because we are messing up the array order. for _, colToDelete := range colsToDelete { - src.DeleteColumn(colToDelete.RawName()) - targ.DeleteColumn(colToDelete.RawName()) + src.DeleteColumn(colToDelete.Name()) + targ.DeleteColumn(colToDelete.Name()) } var targetColumnsMissing Columns for _, col := range src.GetColumns() { - if shouldSkipColumn(col.RawName(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) { + if shouldSkipColumn(col.Name(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) { continue } @@ -64,7 +64,7 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo var sourceColumnsMissing Columns for _, col := range targ.GetColumns() { - if shouldSkipColumn(col.RawName(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) { + if shouldSkipColumn(col.Name(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) { continue } diff --git a/lib/typing/columns/diff_test.go b/lib/typing/columns/diff_test.go index 93a24e90e..8609cedeb 100644 --- a/lib/typing/columns/diff_test.go +++ b/lib/typing/columns/diff_test.go @@ -226,7 +226,7 @@ func TestDiffDeterministic(t *testing.T) { var key string for _, targetKeyMissing := range targetKeysMissing { - key += targetKeyMissing.RawName() + key += targetKeyMissing.Name() } retMap[key] = false diff --git a/models/event/event_save_test.go b/models/event/event_save_test.go index d37b7f2d0..6c7c98f01 100644 --- a/models/event/event_save_test.go +++ b/models/event/event_save_test.go @@ -48,7 +48,7 @@ func (e *EventsTestSuite) TestSaveEvent() { // Check the in-memory DB columns. var found int for _, col := range optimization.ReadOnlyInMemoryCols().GetColumns() { - if col.RawName() == expectedLowerCol || col.RawName() == anotherLowerCol { + if col.Name() == expectedLowerCol || col.Name() == anotherLowerCol { found += 1 } @@ -183,16 +183,16 @@ func (e *EventsTestSuite) TestEvent_SaveColumnsNoData() { td := e.db.GetOrCreateTableData("non_existent") var prevKey string for _, col := range td.ReadOnlyInMemoryCols().GetColumns() { - if col.RawName() == constants.DeleteColumnMarker { + if col.Name() == constants.DeleteColumnMarker { continue } if prevKey == "" { - prevKey = col.RawName() + prevKey = col.Name() continue } - currentKeyParsed, err := strconv.Atoi(col.RawName()) + currentKeyParsed, err := strconv.Atoi(col.Name()) assert.NoError(e.T(), err) prevKeyParsed, err := strconv.Atoi(prevKey) @@ -206,7 +206,7 @@ func (e *EventsTestSuite) TestEvent_SaveColumnsNoData() { evt.Columns.AddColumn(columns.NewColumn("foo", typing.Invalid)) var index int for idx, col := range evt.Columns.GetColumns() { - if col.RawName() == "foo" { + if col.Name() == "foo" { index = idx } } From 4e0736a173025f3af6f2594a3285e7b4b0712a80 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 2 May 2024 11:43:06 -0700 Subject: [PATCH 6/7] [typing] Inline `Column.EscapedName` calls (#542) --- clients/shared/utils.go | 2 +- lib/destination/ddl/ddl.go | 4 +-- lib/destination/ddl/ddl_bq_test.go | 6 ++-- lib/destination/ddl/ddl_sflk_test.go | 5 ++-- lib/destination/dml/merge.go | 20 ++++++++----- lib/typing/columns/columns.go | 6 +--- lib/typing/columns/columns_test.go | 43 ---------------------------- 7 files changed, 23 insertions(+), 63 deletions(-) diff --git a/clients/shared/utils.go b/clients/shared/utils.go index 64f9faa82..45e971ccf 100644 --- a/clients/shared/utils.go +++ b/clients/shared/utils.go @@ -30,7 +30,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col return fmt.Errorf("failed to escape default value: %w", err) } - escapedCol := column.EscapedName(dwh.Dialect()) + escapedCol := dwh.Dialect().QuoteIdentifier(column.Name()) // TODO: This is added because `default` is not technically a column that requires escaping, but it is required when it's in the where clause. // Once we escape everything by default, we can remove this patch of code. diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index c8f0b864b..675dc5941 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -99,7 +99,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error { mutateCol = append(mutateCol, col) switch a.ColumnOp { case constants.Add: - colName := col.EscapedName(a.Dwh.Dialect()) + colName := a.Dwh.Dialect().QuoteIdentifier(col.Name()) if col.PrimaryKey() && a.Mode != config.History { // Don't create a PK for history mode because it's append-only, so the primary key should not be enforced. @@ -108,7 +108,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error { colSQLParts = append(colSQLParts, fmt.Sprintf(`%s %s`, colName, typing.KindToDWHType(col.KindDetails, a.Dwh.Label(), col.PrimaryKey()))) case constants.Delete: - colSQLParts = append(colSQLParts, col.EscapedName(a.Dwh.Dialect())) + colSQLParts = append(colSQLParts, a.Dwh.Dialect().QuoteIdentifier(col.Name())) } } diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 123fcc86c..babea1977 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -86,7 +86,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { assert.NoError(d.T(), alterTableArgs.AlterTable(column)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, column.EscapedName(d.bigQueryStore.Dialect())), query) + assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name())), query) callIdx += 1 } @@ -143,7 +143,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { assert.NoError(d.T(), alterTableArgs.AlterTable(col)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, col.EscapedName(d.bigQueryStore.Dialect()), + assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(col.Name()), typing.KindToDWHType(kind, d.bigQueryStore.Label(), false)), query) callIdx += 1 } @@ -202,7 +202,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { assert.NoError(d.T(), alterTableArgs.AlterTable(column)) query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, column.EscapedName(d.bigQueryStore.Dialect()), + assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name()), typing.KindToDWHType(column.KindDetails, d.bigQueryStore.Label(), false)), query) callIdx += 1 } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 352efba2a..deae9dfd8 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -42,7 +42,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { for i := 0; i < len(cols); i++ { execQuery, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`, - cols[i].EscapedName(d.snowflakeStagesStore.Dialect()), + d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()), typing.KindToDWHType(cols[i].KindDetails, d.snowflakeStagesStore.Label(), false)), execQuery) } @@ -172,7 +172,8 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { execArg, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) assert.Equal(d.T(), execArg, fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", `shop.public."USERS"`, constants.Delete, - cols[i].EscapedName(d.snowflakeStagesStore.Dialect()))) + d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()), + )) } } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index e40f1d890..2c5b67eb2 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -101,7 +101,8 @@ func (m *MergeArgument) GetParts() ([]string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) + quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name()) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey) equalitySQLParts = append(equalitySQLParts, equalitySQL) } @@ -122,7 +123,8 @@ func (m *MergeArgument) GetParts() ([]string, error) { // LEFT JOIN table on pk(s) m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.PrimaryKeys[0].EscapedName(m.Dialect)), + m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()), + ), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 @@ -142,7 +144,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { var pks []string for _, pk := range m.PrimaryKeys { - pks = append(pks, pk.EscapedName(m.Dialect)) + pks = append(pks, m.Dialect.QuoteIdentifier(pk.Name())) } parts := []string{ @@ -159,7 +161,8 @@ func (m *MergeArgument) GetParts() ([]string, error) { // LEFT JOIN table on pk(s) m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) - m.PrimaryKeys[0].EscapedName(m.Dialect)), + m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()), + ), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 @@ -207,7 +210,9 @@ func (m *MergeArgument) GetStatement() (string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) + quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name()) + + equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey) pkCol, isOk := m.Columns.GetColumn(primaryKey.Name()) if !isOk { return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.Name(), m.Columns) @@ -215,7 +220,7 @@ func (m *MergeArgument) GetStatement() (string, error) { if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind { // BigQuery requires special casting to compare two JSON objects. - equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) + equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", quotedPrimaryKey, quotedPrimaryKey) } equalitySQLParts = append(equalitySQLParts, equalitySQL) @@ -288,7 +293,8 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) { var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { // We'll need to escape the primary key as well. - equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(m.Dialect), primaryKey.EscapedName(m.Dialect)) + quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name()) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey) equalitySQLParts = append(equalitySQLParts, equalitySQL) } diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index c7c71053a..19ffef0c3 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -83,10 +83,6 @@ func (c *Column) Name() string { return c.name } -func (c *Column) EscapedName(dialect sql.Dialect) string { - return dialect.QuoteIdentifier(c.name) -} - type Columns struct { columns []Column sync.RWMutex @@ -244,7 +240,7 @@ func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string { continue } - colName := column.EscapedName(dialect) + colName := dialect.QuoteIdentifier(column.Name()) if column.ToastColumn { if column.KindDetails == typing.Struct { cols = append(cols, processToastStructCol(colName, dialect)) diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 525ee12e3..0ed03b152 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -132,49 +132,6 @@ func TestColumn_ShouldBackfill(t *testing.T) { } } -func TestColumn_Name(t *testing.T) { - type _testCase struct { - colName string - expectedName string - // Snowflake - expectedNameEsc string - // BigQuery - expectedNameEscBq string - } - - testCases := []_testCase{ - { - colName: "start", - expectedName: "start", - expectedNameEsc: `"START"`, // since this is a reserved word. - expectedNameEscBq: "`start`", // BQ escapes via backticks. - }, - { - colName: "foo", - expectedName: "foo", - expectedNameEsc: `"FOO"`, - expectedNameEscBq: "`foo`", - }, - { - colName: "bar", - expectedName: "bar", - expectedNameEsc: `"BAR"`, - expectedNameEscBq: "`bar`", - }, - } - - for _, testCase := range testCases { - col := &Column{ - name: testCase.colName, - } - - assert.Equal(t, testCase.expectedName, col.Name(), testCase.colName) - - assert.Equal(t, testCase.expectedNameEsc, col.EscapedName(sql.SnowflakeDialect{}), testCase.colName) - assert.Equal(t, testCase.expectedNameEscBq, col.EscapedName(sql.BigQueryDialect{}), testCase.colName) - } -} - func TestColumns_GetColumnsToUpdate(t *testing.T) { type _testCase struct { name string From a152157b6634d2e657cf6acbdf4bef8dd9abfbb6 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 2 May 2024 11:52:16 -0700 Subject: [PATCH 7/7] Change Artie domain to artie.com (#548) --- README.md | 20 ++++++++++---------- examples/mongodb/README.md | 4 ++-- examples/pubsub_postgres/README.md | 4 ++-- lib/cdc/mongo/debezium_test.go | 4 ++-- lib/cdc/util/relational_data_test.go | 2 +- lib/optimization/event_bench_test.go | 2 +- lib/size/size_bench_test.go | 2 +- lib/telemetry/README.md | 2 +- 8 files changed, 20 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 896669539..ea1c4f8d9 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,12 @@
⚡️ Blazing fast data replication between OLTP and OLAP databases ⚡️
- - + +