From a9f4e5b2aaed873e19cb82abac0260d5237150c4 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Wed, 1 May 2024 15:40:15 -0700 Subject: [PATCH] Remove uen from ma --- clients/bigquery/bigquery.go | 19 ++---- clients/mssql/staging.go | 16 ++--- clients/mssql/store.go | 4 -- clients/redshift/redshift.go | 4 -- clients/redshift/staging.go | 16 ++--- clients/shared/append.go | 16 ++--- clients/shared/merge.go | 17 ++--- clients/shared/table_config_test.go | 1 - clients/snowflake/snowflake.go | 6 +- clients/snowflake/staging.go | 16 ++--- lib/destination/ddl/ddl.go | 5 -- lib/destination/ddl/ddl_alter_delete_test.go | 14 ---- lib/destination/ddl/ddl_bq_test.go | 64 ++++++++---------- lib/destination/ddl/ddl_create_table_test.go | 30 ++++----- lib/destination/ddl/ddl_sflk_test.go | 43 +++++------- lib/destination/ddl/ddl_temp_test.go | 60 ++++++++--------- lib/destination/dml/merge.go | 5 -- lib/destination/dml/merge_bigquery_test.go | 31 ++++----- lib/destination/dml/merge_mssql_test.go | 18 +++-- lib/destination/dml/merge_parts_test.go | 7 -- lib/destination/dml/merge_test.go | 70 +++++++++----------- lib/destination/dml/merge_valid_test.go | 40 ++++------- lib/destination/dwh.go | 1 - 23 files changed, 196 insertions(+), 307 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 51078d4ce..f17802fc3 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -44,14 +44,13 @@ type Store struct { func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), } if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { @@ -119,10 +118,6 @@ func (s *Store) Dialect() sql.Dialect { return sql.BigQueryDialect{} } -func (s *Store) ShouldUppercaseEscapedNames() bool { - return false -} - func (s *Store) GetClient(ctx context.Context) *bigquery.Client { client, err := bigquery.NewClient(ctx, s.config.BigQuery.ProjectID) if err != nil { diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index f373fbfb6..c70813d1c 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -9,20 +9,18 @@ import ( "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/ptr" ) func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), } if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { diff --git a/clients/mssql/store.go b/clients/mssql/store.go index f48c26d40..bff06ab4c 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -39,10 +39,6 @@ func (s *Store) Dialect() sql.Dialect { return sql.DefaultDialect{} } -func (s *Store) ShouldUppercaseEscapedNames() bool { - return false -} - func (s *Store) Merge(tableData *optimization.TableData) error { return shared.Merge(s, tableData, s.config, types.MergeOpts{}) } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 952d89007..65720e005 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -50,10 +50,6 @@ func (s *Store) Dialect() sql.Dialect { return sql.RedshiftDialect{} } -func (s *Store) ShouldUppercaseEscapedNames() bool { - return false -} - func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) { const ( describeNameCol = "column_name" diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index bb6614baa..5c562034c 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -12,21 +12,19 @@ import ( "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/s3lib" ) func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error { // Redshift always creates a temporary table. tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), } if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { diff --git a/clients/shared/append.go b/clients/shared/append.go index 06f1e2172..217758183 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -8,7 +8,6 @@ import ( "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -29,14 +28,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode()) createAlterTableArgs := ddl.AlterTableArgs{ - Dwh: dwh, - Tc: tableConfig, - TableID: tableID, - CreateTable: tableConfig.CreateTable(), - ColumnOp: constants.Add, - CdcTime: tableData.LatestCDCTs, - UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), + Dwh: dwh, + Tc: tableConfig, + TableID: tableID, + CreateTable: tableConfig.CreateTable(), + ColumnOp: constants.Add, + CdcTime: tableData.LatestCDCTs, + Mode: tableData.Mode(), } // Keys that exist in CDC stream, but not in DWH diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 88c560efa..754c3f140 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -35,14 +35,13 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) createAlterTableArgs := ddl.AlterTableArgs{ - Dwh: dwh, - Tc: tableConfig, - TableID: tableID, - CreateTable: tableConfig.CreateTable(), - ColumnOp: constants.Add, - CdcTime: tableData.LatestCDCTs, - UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), + Dwh: dwh, + Tc: tableConfig, + TableID: tableID, + CreateTable: tableConfig.CreateTable(), + ColumnOp: constants.Add, + CdcTime: tableData.LatestCDCTs, + Mode: tableData.Mode(), } // Columns that are missing in DWH, but exist in our CDC stream. @@ -60,7 +59,6 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg ColumnOp: constants.Delete, ContainOtherOperations: tableData.ContainOtherOperations(), CdcTime: tableData.LatestCDCTs, - UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), Mode: tableData.Mode(), } @@ -130,7 +128,6 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg SoftDelete: tableData.TopicConfig().SoftDelete, DestKind: dwh.Label(), Dialect: dwh.Dialect(), - UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), ContainsHardDeletes: ptr.ToBool(tableData.ContainsHardDeletes()), } diff --git a/clients/shared/table_config_test.go b/clients/shared/table_config_test.go index db45a710e..c85b9c053 100644 --- a/clients/shared/table_config_test.go +++ b/clients/shared/table_config_test.go @@ -75,7 +75,6 @@ func (MockDWH) PrepareTemporaryTable(tableData *optimization.TableData, tableCon func (MockDWH) IdentifierFor(topicConfig kafkalib.TopicConfig, name string) types.TableIdentifier { panic("not implemented") } -func (MockDWH) ShouldUppercaseEscapedNames() bool { return true } type MockTableIdentifier struct{ fqName string } diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index eec3b4dda..64de266e0 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -78,11 +78,7 @@ func (s *Store) Label() constants.DestinationKind { } func (s *Store) Dialect() sql.Dialect { - return sql.SnowflakeDialect{UppercaseEscNames: s.ShouldUppercaseEscapedNames()} -} - -func (s *Store) ShouldUppercaseEscapedNames() bool { - return s.config.SharedDestinationConfig.UppercaseEscapedNames + return sql.SnowflakeDialect{UppercaseEscNames: s.config.SharedDestinationConfig.UppercaseEscapedNames} } func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap { diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 2c761ebf9..2528dfae4 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -12,7 +12,6 @@ import ( "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/values" @@ -49,14 +48,13 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts [] func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), } if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index ac2803fe6..72c193811 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -44,7 +44,6 @@ type AlterTableArgs struct { TableID types.TableIdentifier CreateTable bool TemporaryTable bool - UppercaseEscNames *bool ColumnOp constants.ColumnOperation Mode config.Mode @@ -69,10 +68,6 @@ func (a AlterTableArgs) Validate() error { } } - if a.UppercaseEscNames == nil { - return fmt.Errorf("uppercaseEscNames cannot be nil") - } - return nil } diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index 508ae7ed6..aa0f2c699 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -7,8 +7,6 @@ import ( "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" @@ -69,7 +67,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -90,7 +87,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -112,7 +108,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -147,7 +142,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: false, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -168,7 +162,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: false, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -189,7 +182,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: false, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -226,7 +218,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -243,7 +234,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -260,7 +250,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -285,7 +274,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - UppercaseEscNames: ptr.ToBool(true), Mode: config.Replication, } @@ -300,7 +288,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -315,7 +302,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 5985db3cb..e756d0a1f 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -9,8 +9,6 @@ import ( "github.com/artie-labs/transfer/clients/bigquery" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/columns" "github.com/stretchr/testify/assert" @@ -61,7 +59,6 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { ColumnOp: constants.Delete, ContainOtherOperations: true, CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -84,7 +81,6 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { ColumnOp: constants.Delete, ContainOtherOperations: true, CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -134,14 +130,13 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { tc := d.bigQueryStore.GetConfigMap().TableConfig(tableID) for name, kind := range newCols { alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.bigQueryStore, - Tc: tc, - TableID: tableID, - CreateTable: tc.CreateTable(), - ColumnOp: constants.Add, - CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.bigQueryStore, + Tc: tc, + TableID: tableID, + CreateTable: tc.CreateTable(), + ColumnOp: constants.Add, + CdcTime: ts, + Mode: config.Replication, } col := columns.NewColumn(name, kind) @@ -196,14 +191,13 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { // BQ returning the same error because the column already exists. d.fakeBigQueryStore.ExecReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]")) alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.bigQueryStore, - Tc: tc, - TableID: tableID, - CreateTable: tc.CreateTable(), - ColumnOp: constants.Add, - CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.bigQueryStore, + Tc: tc, + TableID: tableID, + CreateTable: tc.CreateTable(), + ColumnOp: constants.Add, + CdcTime: ts, + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(column)) @@ -250,14 +244,13 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfig(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfig(tableID).ReadOnlyColumnsToDelete()) for _, column := range cols.GetColumns() { alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.bigQueryStore, - Tc: tc, - TableID: tableID, - CreateTable: tc.CreateTable(), - ColumnOp: constants.Delete, - CdcTime: ts, - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.bigQueryStore, + Tc: tc, + TableID: tableID, + CreateTable: tc.CreateTable(), + ColumnOp: constants.Delete, + CdcTime: ts, + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(column)) } @@ -268,14 +261,13 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { // Now try to delete again and with an increased TS. It should now be all deleted. for _, column := range cols.GetColumns() { alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.bigQueryStore, - Tc: tc, - TableID: tableID, - CreateTable: tc.CreateTable(), - ColumnOp: constants.Delete, - CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.bigQueryStore, + Tc: tc, + TableID: tableID, + CreateTable: tc.CreateTable(), + ColumnOp: constants.Delete, + CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(column)) diff --git a/lib/destination/ddl/ddl_create_table_test.go b/lib/destination/ddl/ddl_create_table_test.go index 1746856d6..aada0afbc 100644 --- a/lib/destination/ddl/ddl_create_table_test.go +++ b/lib/destination/ddl/ddl_create_table_test.go @@ -8,8 +8,6 @@ import ( "github.com/artie-labs/transfer/clients/snowflake" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/config/constants" @@ -56,13 +54,12 @@ func (d *DDLTestSuite) Test_CreateTable() { }, } { alterTableArgs := ddl.AlterTableArgs{ - Dwh: dwhTc._dwh, - Tc: dwhTc._tableConfig, - TableID: dwhTc._tableID, - CreateTable: dwhTc._tableConfig.CreateTable(), - ColumnOp: constants.Add, - UppercaseEscNames: ptr.ToBool(true), // Will be ignored by BigQuery - Mode: config.Replication, + Dwh: dwhTc._dwh, + Tc: dwhTc._tableConfig, + TableID: dwhTc._tableID, + CreateTable: dwhTc._tableConfig.CreateTable(), + ColumnOp: constants.Add, + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(columns.NewColumn("name", typing.String))) @@ -122,14 +119,13 @@ func (d *DDLTestSuite) TestCreateTable() { tc := d.snowflakeStagesStore.GetConfigMap().TableConfig(tableID) alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.snowflakeStagesStore, - Tc: tc, - TableID: tableID, - CreateTable: tc.CreateTable(), - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - UppercaseEscNames: ptr.ToBool(true), - Mode: config.Replication, + Dwh: d.snowflakeStagesStore, + Tc: tc, + TableID: tableID, + CreateTable: tc.CreateTable(), + ColumnOp: constants.Add, + CdcTime: time.Now().UTC(), + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(testCase.cols...), testCase.name) diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index de84b2dda..44708a84c 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -8,8 +8,6 @@ import ( "github.com/artie-labs/transfer/clients/snowflake" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/columns" "github.com/stretchr/testify/assert" @@ -34,13 +32,12 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { tc := d.snowflakeStagesStore.GetConfigMap().TableConfig(tableID) alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.snowflakeStagesStore, - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.snowflakeStagesStore, + Tc: tc, + TableID: tableID, + ColumnOp: constants.Add, + CdcTime: time.Now().UTC(), + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) @@ -68,13 +65,12 @@ func (d *DDLTestSuite) TestAlterIdempotency() { d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists")) alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.snowflakeStagesStore, - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.snowflakeStagesStore, + Tc: tc, + TableID: tableID, + ColumnOp: constants.Add, + CdcTime: time.Now().UTC(), + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) @@ -98,13 +94,12 @@ func (d *DDLTestSuite) TestAlterTableAdd() { tc := d.snowflakeStagesStore.GetConfigMap().TableConfig(tableID) alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.snowflakeStagesStore, - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.snowflakeStagesStore, + Tc: tc, + TableID: tableID, + ColumnOp: constants.Add, + CdcTime: time.Now().UTC(), + Mode: config.Replication, } assert.NoError(d.T(), alterTableArgs.AlterTable(cols...)) @@ -146,7 +141,6 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { ContainOtherOperations: true, ColumnOp: constants.Delete, CdcTime: time.Now().UTC(), - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } @@ -211,7 +205,6 @@ func (d *DDLTestSuite) TestAlterTableDelete() { ColumnOp: constants.Delete, ContainOtherOperations: true, CdcTime: time.Now(), - UppercaseEscNames: ptr.ToBool(false), Mode: config.Replication, } diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index efa3a6940..3fdd24a2b 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -7,8 +7,6 @@ import ( "github.com/artie-labs/transfer/clients/snowflake" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/config/constants" @@ -20,10 +18,9 @@ import ( func (d *DDLTestSuite) TestValidate_AlterTableArgs() { a := &ddl.AlterTableArgs{ - ColumnOp: constants.Delete, - CreateTable: true, - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + ColumnOp: constants.Delete, + CreateTable: true, + Mode: config.Replication, } assert.Contains(d.T(), a.Validate().Error(), "incompatible operation - cannot drop columns and create table at the same time") @@ -39,15 +36,14 @@ func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfig(tableID) args := ddl.AlterTableArgs{ - Dwh: d.snowflakeStagesStore, - Tc: snowflakeTc, - TableID: tableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - CdcTime: time.Time{}, - UppercaseEscNames: ptr.ToBool(true), - Mode: config.Replication, + Dwh: d.snowflakeStagesStore, + Tc: snowflakeTc, + TableID: tableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + CdcTime: time.Time{}, + Mode: config.Replication, } // No columns. @@ -74,15 +70,14 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) sflkStageTc := d.snowflakeStagesStore.GetConfigMap().TableConfig(tableID) args := ddl.AlterTableArgs{ - Dwh: d.snowflakeStagesStore, - Tc: sflkStageTc, - TableID: tableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - CdcTime: time.Time{}, - UppercaseEscNames: ptr.ToBool(true), - Mode: config.Replication, + Dwh: d.snowflakeStagesStore, + Tc: sflkStageTc, + TableID: tableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + CdcTime: time.Time{}, + Mode: config.Replication, } assert.NoError(d.T(), args.AlterTable(columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String))) @@ -100,15 +95,14 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) bqTc := d.bigQueryStore.GetConfigMap().TableConfig(tableID) args := ddl.AlterTableArgs{ - Dwh: d.bigQueryStore, - Tc: bqTc, - TableID: tableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - CdcTime: time.Time{}, - UppercaseEscNames: ptr.ToBool(false), - Mode: config.Replication, + Dwh: d.bigQueryStore, + Tc: bqTc, + TableID: tableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + CdcTime: time.Time{}, + Mode: config.Replication, } assert.NoError(d.T(), args.AlterTable(columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String))) diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 0c69a0d82..c17fde831 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -30,7 +30,6 @@ type MergeArgument struct { // ContainsHardDeletes is only used for Redshift and MergeStatementParts, // where we do not issue a DELETE statement if there are no hard deletes in the batch ContainsHardDeletes *bool - UppercaseEscNames *bool Dialect sql.Dialect } @@ -55,10 +54,6 @@ func (m *MergeArgument) Valid() error { return fmt.Errorf("subQuery cannot be empty") } - if m.UppercaseEscNames == nil { - return fmt.Errorf("uppercaseEscNames cannot be nil") - } - if !constants.IsValidDestination(m.DestKind) { return fmt.Errorf("invalid destination: %s", m.DestKind) } diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go index 0f383d35c..726ffa439 100644 --- a/lib/destination/dml/merge_bigquery_test.go +++ b/lib/destination/dml/merge_bigquery_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -18,14 +17,13 @@ func TestMergeStatement_TempTable(t *testing.T) { cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) mergeArg := &MergeArgument{ - TableID: MockTableIdentifier{"customers.orders"}, - SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_id", typing.Invalid), sql.BigQueryDialect{})}, - Columns: &cols, - DestKind: constants.BigQuery, - Dialect: sql.BigQueryDialect{}, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(false), + TableID: MockTableIdentifier{"customers.orders"}, + SubQuery: "customers.orders_tmp", + PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_id", typing.Invalid), sql.BigQueryDialect{})}, + Columns: &cols, + DestKind: constants.BigQuery, + Dialect: sql.BigQueryDialect{}, + SoftDelete: false, } mergeSQL, err := mergeArg.GetStatement() @@ -41,14 +39,13 @@ func TestMergeStatement_JSONKey(t *testing.T) { cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) mergeArg := &MergeArgument{ - TableID: MockTableIdentifier{"customers.orders"}, - SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_oid", typing.Invalid), sql.BigQueryDialect{})}, - Columns: &cols, - DestKind: constants.BigQuery, - Dialect: sql.BigQueryDialect{}, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(false), + TableID: MockTableIdentifier{"customers.orders"}, + SubQuery: "customers.orders_tmp", + PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_oid", typing.Invalid), sql.BigQueryDialect{})}, + Columns: &cols, + DestKind: constants.BigQuery, + Dialect: sql.BigQueryDialect{}, + SoftDelete: false, } mergeSQL, err := mergeArg.GetStatement() diff --git a/lib/destination/dml/merge_mssql_test.go b/lib/destination/dml/merge_mssql_test.go index 163eab3a1..53174fdad 100644 --- a/lib/destination/dml/merge_mssql_test.go +++ b/lib/destination/dml/merge_mssql_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -42,15 +41,14 @@ func Test_GetMSSQLStatement(t *testing.T) { strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) mergeArg := MergeArgument{ - TableID: MockTableIdentifier{fqTable}, - SubQuery: subQuery, - IdempotentKey: "", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), sql.DefaultDialect{})}, - Columns: &_cols, - DestKind: constants.MSSQL, - Dialect: sql.DefaultDialect{}, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(false), + TableID: MockTableIdentifier{fqTable}, + SubQuery: subQuery, + IdempotentKey: "", + PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), sql.DefaultDialect{})}, + Columns: &_cols, + DestKind: constants.MSSQL, + Dialect: sql.DefaultDialect{}, + SoftDelete: false, } mergeSQL, err := mergeArg.GetMSSQLStatement() diff --git a/lib/destination/dml/merge_parts_test.go b/lib/destination/dml/merge_parts_test.go index 0be37e169..ffba67a61 100644 --- a/lib/destination/dml/merge_parts_test.go +++ b/lib/destination/dml/merge_parts_test.go @@ -75,7 +75,6 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) { DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(false), - UppercaseEscNames: ptr.ToBool(false), } parts, err := mergeArg.GetParts() @@ -103,7 +102,6 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) { DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, SoftDelete: true, - UppercaseEscNames: ptr.ToBool(false), ContainsHardDeletes: ptr.ToBool(false), } @@ -144,7 +142,6 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, SoftDelete: true, - UppercaseEscNames: ptr.ToBool(false), ContainsHardDeletes: ptr.ToBool(false), } @@ -188,7 +185,6 @@ func TestMergeStatementParts(t *testing.T) { DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(true), - UppercaseEscNames: ptr.ToBool(false), } parts, err := mergeArg.GetParts() @@ -216,7 +212,6 @@ func TestMergeStatementParts(t *testing.T) { Dialect: sql.RedshiftDialect{}, IdempotentKey: "created_at", ContainsHardDeletes: ptr.ToBool(true), - UppercaseEscNames: ptr.ToBool(false), } parts, err = mergeArg.GetParts() @@ -248,7 +243,6 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) { DestKind: constants.Redshift, Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(true), - UppercaseEscNames: ptr.ToBool(false), } parts, err := mergeArg.GetParts() @@ -276,7 +270,6 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) { Dialect: sql.RedshiftDialect{}, ContainsHardDeletes: ptr.ToBool(true), IdempotentKey: "created_at", - UppercaseEscNames: ptr.ToBool(false), } parts, err = mergeArg.GetParts() diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 5ec119f58..5987fedd7 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -10,7 +10,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -60,15 +59,14 @@ func TestMergeStatementSoftDelete(t *testing.T) { dialect := sql.SnowflakeDialect{UppercaseEscNames: true} for _, idempotentKey := range []string{"", "updated_at"} { mergeArg := MergeArgument{ - TableID: MockTableIdentifier{fqTable}, - SubQuery: subQuery, - IdempotentKey: idempotentKey, - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, - Columns: &_cols, - DestKind: constants.Snowflake, - Dialect: dialect, - SoftDelete: true, - UppercaseEscNames: ptr.ToBool(true), + TableID: MockTableIdentifier{fqTable}, + SubQuery: subQuery, + IdempotentKey: idempotentKey, + PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, + Columns: &_cols, + DestKind: constants.Snowflake, + Dialect: dialect, + SoftDelete: true, } mergeSQL, err := mergeArg.GetStatement() @@ -111,15 +109,14 @@ func TestMergeStatement(t *testing.T) { dialect := sql.SnowflakeDialect{UppercaseEscNames: true} mergeArg := MergeArgument{ - TableID: MockTableIdentifier{fqTable}, - SubQuery: subQuery, - IdempotentKey: "", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, - Columns: &_cols, - DestKind: constants.Snowflake, - Dialect: dialect, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(true), + TableID: MockTableIdentifier{fqTable}, + SubQuery: subQuery, + IdempotentKey: "", + PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, + Columns: &_cols, + DestKind: constants.Snowflake, + Dialect: dialect, + SoftDelete: false, } mergeSQL, err := mergeArg.GetStatement() @@ -161,15 +158,14 @@ func TestMergeStatementIdempotentKey(t *testing.T) { dialect := sql.SnowflakeDialect{UppercaseEscNames: true} mergeArg := MergeArgument{ - TableID: MockTableIdentifier{fqTable}, - SubQuery: subQuery, - IdempotentKey: "updated_at", - PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, - Columns: &_cols, - DestKind: constants.Snowflake, - Dialect: dialect, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(true), + TableID: MockTableIdentifier{fqTable}, + SubQuery: subQuery, + IdempotentKey: "updated_at", + PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)}, + Columns: &_cols, + DestKind: constants.Snowflake, + Dialect: dialect, + SoftDelete: false, } mergeSQL, err := mergeArg.GetStatement() @@ -212,11 +208,10 @@ func TestMergeStatementCompositeKey(t *testing.T) { columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect), columns.NewWrapper(columns.NewColumn("another_id", typing.Invalid), dialect), }, - Columns: &_cols, - DestKind: constants.Snowflake, - Dialect: dialect, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(true), + Columns: &_cols, + DestKind: constants.Snowflake, + Dialect: dialect, + SoftDelete: false, } mergeSQL, err := mergeArg.GetStatement() @@ -263,11 +258,10 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect), columns.NewWrapper(columns.NewColumn("group", typing.Invalid), dialect), }, - Columns: &_cols, - DestKind: constants.Snowflake, - Dialect: dialect, - SoftDelete: false, - UppercaseEscNames: ptr.ToBool(true), + Columns: &_cols, + DestKind: constants.Snowflake, + Dialect: dialect, + SoftDelete: false, } mergeSQL, err := mergeArg.GetStatement() diff --git a/lib/destination/dml/merge_valid_test.go b/lib/destination/dml/merge_valid_test.go index 9a3e109a0..cd3fe22d2 100644 --- a/lib/destination/dml/merge_valid_test.go +++ b/lib/destination/dml/merge_valid_test.go @@ -6,7 +6,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -71,48 +70,35 @@ func TestMergeArgument_Valid(t *testing.T) { expectedErr: "subQuery cannot be empty", }, { - name: "did not pass in uppercase esc col", + name: "missing dest kind", mergeArg: &MergeArgument{ PrimaryKeys: primaryKeys, Columns: &cols, - TableID: MockTableIdentifier{"schema.tableName"}, SubQuery: "schema.tableName", - }, - expectedErr: "uppercaseEscNames cannot be nil", - }, - { - name: "missing dest kind", - mergeArg: &MergeArgument{ - PrimaryKeys: primaryKeys, - Columns: &cols, - SubQuery: "schema.tableName", - TableID: MockTableIdentifier{"schema.tableName"}, - UppercaseEscNames: ptr.ToBool(false), + TableID: MockTableIdentifier{"schema.tableName"}, }, expectedErr: "invalid destination", }, { name: "missing dialect kind", mergeArg: &MergeArgument{ - PrimaryKeys: primaryKeys, - Columns: &cols, - SubQuery: "schema.tableName", - TableID: MockTableIdentifier{"schema.tableName"}, - UppercaseEscNames: ptr.ToBool(false), - DestKind: constants.BigQuery, + PrimaryKeys: primaryKeys, + Columns: &cols, + SubQuery: "schema.tableName", + TableID: MockTableIdentifier{"schema.tableName"}, + DestKind: constants.BigQuery, }, expectedErr: "dialect cannot be nil", }, { name: "everything exists", mergeArg: &MergeArgument{ - PrimaryKeys: primaryKeys, - Columns: &cols, - SubQuery: "schema.tableName", - TableID: MockTableIdentifier{"schema.tableName"}, - UppercaseEscNames: ptr.ToBool(false), - DestKind: constants.BigQuery, - Dialect: sql.BigQueryDialect{}, + PrimaryKeys: primaryKeys, + Columns: &cols, + SubQuery: "schema.tableName", + TableID: MockTableIdentifier{"schema.tableName"}, + DestKind: constants.BigQuery, + Dialect: sql.BigQueryDialect{}, }, }, } diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 00a86b959..5685f9f03 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -21,7 +21,6 @@ type DataWarehouse interface { Begin() (*sql.Tx, error) // Helper functions for merge - ShouldUppercaseEscapedNames() bool IsRetryableError(err error) bool IdentifierFor(topicConfig kafkalib.TopicConfig, table string) types.TableIdentifier GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error)