diff --git a/clients/shared/table_config.go b/clients/shared/table_config.go index 5a71b8ef9..b08ccedbb 100644 --- a/clients/shared/table_config.go +++ b/clients/shared/table_config.go @@ -47,19 +47,17 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) { } }() - var tableMissing bool if err != nil { if g.Dwh.Dialect().IsTableDoesNotExistErr(err) { // This branch is currently only used by Snowflake. // Swallow the error, make sure all the metadata is created - tableMissing = true err = nil } else { return nil, fmt.Errorf("failed to query %T, err: %w, query: %q", g.Dwh, err, g.Query) } } - var cols columns.Columns + var cols []columns.Column for rows != nil && rows.Next() { // figure out what columns were returned // the column names will be the JSON object field keys @@ -126,15 +124,10 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) { return nil, fmt.Errorf("unknown default value strategy: %q", strategy) } - cols.AddColumn(col) + cols = append(cols, col) } - // Do it this way via rows.Next() because that will move the iterator and cause us to miss a column. - if len(cols.GetColumns()) == 0 { - tableMissing = true - } - - tableCfg := types.NewDwhTableConfig(&cols, tableMissing, g.DropDeletedColumns) + tableCfg := types.NewDwhTableConfig(cols, g.DropDeletedColumns) g.ConfigMap.AddTableToConfig(g.TableID, tableCfg) return tableCfg, nil } diff --git a/clients/shared/table_config_test.go b/clients/shared/table_config_test.go index e896c4a01..4136e3198 100644 --- a/clients/shared/table_config_test.go +++ b/clients/shared/table_config_test.go @@ -13,16 +13,16 @@ import ( func TestGetTableConfig(t *testing.T) { // Return early because table is found in configMap. - cols := &columns.Columns{} + var cols []columns.Column for i := range 100 { - cols.AddColumn(columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid)) + cols = append(cols, columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid)) } cm := &types.DwhToTablesConfigMap{} fakeTableID := &mocks.FakeTableIdentifier{} fakeTableID.FullyQualifiedNameReturns("dusty_the_mini_aussie") - tableCfg := types.NewDwhTableConfig(cols, false, false) + tableCfg := types.NewDwhTableConfig(cols, false) cm.AddTableToConfig(fakeTableID, tableCfg) actualTableCfg, err := GetTableCfgArgs{ diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index efb64153c..beef8215b 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -19,7 +19,7 @@ import ( func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { tableID := NewTableIdentifier("coffee_shop", "public", "orders") - var cols columns.Columns + var cols []columns.Column for colName, kindDetails := range map[string]typing.KindDetails{ "id": typing.Integer, "customer_id": typing.Integer, @@ -27,10 +27,10 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { "name": typing.String, "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), } { - cols.AddColumn(columns.NewColumn(colName, kindDetails)) + cols = append(cols, columns.NewColumn(colName, kindDetails)) } - s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true)) + s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(cols, true)) nameCol := columns.NewColumn("name", typing.String) tc := s.stageStore.configMap.TableConfigCache(tableID) @@ -46,7 +46,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { tableID := NewTableIdentifier("coffee_shop", "orders", "public") - var cols columns.Columns + var cols []columns.Column for colName, kindDetails := range map[string]typing.KindDetails{ "id": typing.Integer, "customer_id": typing.Integer, @@ -54,10 +54,10 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { "name": typing.String, "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), } { - cols.AddColumn(columns.NewColumn(colName, kindDetails)) + cols = append(cols, columns.NewColumn(colName, kindDetails)) } - config := types.NewDwhTableConfig(&cols, false, true) + config := types.NewDwhTableConfig(cols, true) s.stageStore.configMap.AddTableToConfig(tableID, config) nameCol := columns.NewColumn("name", typing.String) @@ -85,7 +85,7 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { } func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() { - var cols columns.Columns + var cols []columns.Column for colName, kindDetails := range map[string]typing.KindDetails{ "id": typing.Integer, "customer_id": typing.Integer, @@ -93,10 +93,10 @@ func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() { "name": typing.String, "created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""), } { - cols.AddColumn(columns.NewColumn(colName, kindDetails)) + cols = append(cols, columns.NewColumn(colName, kindDetails)) } - tc := types.NewDwhTableConfig(&cols, false, false) + tc := types.NewDwhTableConfig(cols, false) tc.SetColumnsToDelete(map[string]time.Time{"customer_id": time.Now()}) assert.Equal(s.T(), len(tc.ReadOnlyColumnsToDelete()), 1) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 3c0f25dfb..64fb4f401 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -72,12 +72,12 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() { constants.OnlySetDeleteColumnMarker: typing.Boolean, } - var anotherCols columns.Columns + var anotherCols []columns.Column for colName, kindDetails := range anotherColToKindDetailsMap { - anotherCols.AddColumn(columns.NewColumn(colName, kindDetails)) + anotherCols = append(anotherCols, columns.NewColumn(colName, kindDetails)) } - s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, false, true)) + s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(anotherCols, true)) assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) _col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name") @@ -122,7 +122,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { tableData.InsertRow(pk, row, false) } - s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, false, true)) + s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(cols.GetColumns(), true)) assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount()) } @@ -170,7 +170,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() { tableID := s.identifierFor(tableData) fqName := tableID.FullyQualifiedName() - s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true)) + s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), true)) err := s.stageStore.Merge(context.Background(), tableData) assert.Nil(s.T(), err) s.fakeStageStore.ExecReturns(nil, nil) @@ -252,7 +252,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { } sflkCols.AddColumn(columns.NewColumn("new", typing.String)) - _config := types.NewDwhTableConfig(&sflkCols, false, true) + _config := types.NewDwhTableConfig(sflkCols.GetColumns(), true) s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config) assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 90181278f..a0b93498a 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -153,7 +153,7 @@ func generateTableData(rows int) (TableIdentifier, *optimization.TableData) { func (s *SnowflakeTestSuite) TestPrepareTempTable() { tempTableID, tableData := generateTableData(10) tempTableName := tempTableID.FullyQualifiedName() - s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(nil, true)) sflkTc := s.stageStore.GetConfigMap().TableConfigCache(tempTableID) { diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index db062fb4d..1bb8a343d 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -44,13 +44,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Testing 3 scenarios here // 1. DropDeletedColumns = false, ContainOtherOperations = true, don't delete ever. - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, false)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), false)) bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID) - d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, false)) + d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(cols.GetColumns(), false)) redshiftTc := d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID) - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, false)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(cols.GetColumns(), false)) snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID) // Prior to deletion, there should be no colsToDelete assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete()) @@ -119,13 +119,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) // 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID) - d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true)) + d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID) - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID) // Prior to deletion, there should be no colsToDelete @@ -193,13 +193,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) // 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp. - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID) - d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true)) + d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID) - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID) // Prior to deletion, there should be no colsToDelete diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index db6748896..2094c682f 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -43,7 +43,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name()) fqName := tableID.FullyQualifiedName() originalColumnLength := len(cols.GetColumns()) - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), true)) tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) // Prior to deletion, there should be no colsToDelete @@ -119,7 +119,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { existingCols.AddColumn(columns.NewColumn(colName, kindDetails)) } - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true)) // Prior to adding, there should be no colsToDelete assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) @@ -177,7 +177,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { existingCols.AddColumn(columns.NewColumn(colName, kindDetails)) } - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true)) // Prior to adding, there should be no colsToDelete assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) @@ -235,7 +235,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name()) originalColumnLength := len(columnNameToKindDetailsMap) - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, false)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), false)) tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) // Prior to deletion, there should be no colsToDelete diff --git a/lib/destination/ddl/ddl_create_table_test.go b/lib/destination/ddl/ddl_create_table_test.go index a665ec47c..1591d1452 100644 --- a/lib/destination/ddl/ddl_create_table_test.go +++ b/lib/destination/ddl/ddl_create_table_test.go @@ -21,10 +21,10 @@ import ( func (d *DDLTestSuite) Test_CreateTable() { bqTableID := bigquery.NewTableIdentifier("", "mock_dataset", "mock_table") - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(nil, true)) snowflakeTableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(nil, true)) type dwhToTableConfig struct { _tableID sql.TableIdentifier @@ -115,7 +115,7 @@ func (d *DDLTestSuite) TestCreateTable() { for index, testCase := range testCases { tableID := snowflake.NewTableIdentifier("demo", "public", "experiments") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 3f435eb7b..c9246acb8 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -26,7 +26,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { } tableID := snowflake.NewTableIdentifier("shop", "public", "complex_columns") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ @@ -58,7 +58,7 @@ func (d *DDLTestSuite) TestAlterIdempotency() { } tableID := snowflake.NewTableIdentifier("shop", "public", "orders") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists")) @@ -88,7 +88,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { } tableID := snowflake.NewTableIdentifier("shop", "public", "orders") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ @@ -130,7 +130,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { } tableID := snowflake.NewTableIdentifier("shop", "public", "users") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ Dialect: d.snowflakeStagesStore.Dialect(), @@ -190,7 +190,7 @@ func (d *DDLTestSuite) TestAlterTableDelete() { tableID := snowflake.NewTableIdentifier("shop", "public", "users1") - tableCfg := types.NewDwhTableConfig(&columns.Columns{}, false, true) + tableCfg := types.NewDwhTableConfig(nil, true) tableCfg.SetColumnsToDelete(map[string]time.Time{ "col_to_delete": time.Now().Add(-2 * constants.DeletionConfidencePadding), "answers": time.Now().Add(-2 * constants.DeletionConfidencePadding), diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index 7a582f81d..ada4bc6c0 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -35,7 +35,7 @@ func (d *DDLTestSuite) TestValidate_AlterTableArgs() { func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { tableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) args := ddl.AlterTableArgs{ Dialect: d.snowflakeStagesStore.Dialect(), @@ -55,7 +55,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { assert.ErrorContains(d.T(), args.AlterTable(d.snowflakeStagesStore), "incompatible operation - cannot drop columns and create table at the same time") // Change it to SFLK + Stage - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) snowflakeStagesTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) args.Dialect = d.snowflakeStagesStore.Dialect() args.Tc = snowflakeStagesTc @@ -69,7 +69,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { // Snowflake Stage tableID := snowflake.NewTableIdentifier("db", "schema", "tempTableName") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) sflkStageTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) args := ddl.AlterTableArgs{ Dialect: d.snowflakeStagesStore.Dialect(), @@ -94,7 +94,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { { // BigQuery tableID := bigquery.NewTableIdentifier("db", "schema", "tempTableName") - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) args := ddl.AlterTableArgs{ Dialect: d.bigQueryStore.Dialect(), diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index cfec85fea..4f3c0f7d7 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -21,11 +21,11 @@ type DwhTableConfig struct { sync.RWMutex } -func NewDwhTableConfig(columns *columns.Columns, createTable, dropDeletedColumns bool) *DwhTableConfig { +func NewDwhTableConfig(cols []columns.Column, dropDeletedColumns bool) *DwhTableConfig { return &DwhTableConfig{ - columns: columns, + columns: columns.NewColumns(cols), columnsToDelete: make(map[string]time.Time), - createTable: createTable, + createTable: len(cols) == 0, dropDeletedColumns: dropDeletedColumns, } } diff --git a/lib/destination/types/table_config_test.go b/lib/destination/types/table_config_test.go index efd331f0b..a41b7a7ea 100644 --- a/lib/destination/types/table_config_test.go +++ b/lib/destination/types/table_config_test.go @@ -20,7 +20,7 @@ import ( func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { // Test 3 different possibilities: // 1. DropDeletedColumns = false, so don't delete. - dwhTableConfig := types.NewDwhTableConfig(&columns.Columns{}, false, false) + dwhTableConfig := types.NewDwhTableConfig(nil, false) for i := 0; i < 100; i++ { results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), true) assert.False(t, results) @@ -28,7 +28,7 @@ func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { } // 2. DropDeletedColumns = true and ContainsOtherOperations = false, so don't delete - dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, false, true) + dwhTableConfig = types.NewDwhTableConfig(nil, true) for i := 0; i < 100; i++ { results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), false) assert.False(t, results) @@ -36,7 +36,7 @@ func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { } // 3. DropDeletedColumns = true and ContainsOtherOperations = true, now check CDC time to delete. - dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, false, true) + dwhTableConfig = types.NewDwhTableConfig(nil, true) for i := 0; i < 100; i++ { results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), true) assert.False(t, results) @@ -49,12 +49,12 @@ func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { // TestDwhTableConfig_ColumnsConcurrency this file is meant to test the concurrency methods of .Columns() // In this test, we spin up 5 parallel Go-routines each making 100 calls to .Columns() and assert the validity of the data. func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) { - var cols columns.Columns - cols.AddColumn(columns.NewColumn("foo", typing.Struct)) - cols.AddColumn(columns.NewColumn("bar", typing.String)) - cols.AddColumn(columns.NewColumn("boolean", typing.Boolean)) + var cols []columns.Column + cols = append(cols, columns.NewColumn("foo", typing.Struct)) + cols = append(cols, columns.NewColumn("bar", typing.String)) + cols = append(cols, columns.NewColumn("boolean", typing.Boolean)) - dwhTableCfg := types.NewDwhTableConfig(&cols, false, false) + dwhTableCfg := types.NewDwhTableConfig(cols, false) var wg sync.WaitGroup for i := 0; i < 5; i++ { @@ -78,7 +78,7 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) { } func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { - tc := types.NewDwhTableConfig(&columns.Columns{}, false, false) + tc := types.NewDwhTableConfig(nil, false) for _, col := range []string{"a", "b", "c", "d", "e"} { tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String)) } @@ -106,7 +106,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { } func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) { - tc := types.NewDwhTableConfig(nil, false, false) + tc := types.NewDwhTableConfig(nil, false) colsToDelete := make(map[string]time.Time) for _, colToDelete := range []string{"a", "b", "c", "d"} { colsToDelete[colToDelete] = time.Now() @@ -154,7 +154,7 @@ func TestAuditColumnsToDelete(t *testing.T) { } for idx, tc := range tcs { - dwhTc := types.NewDwhTableConfig(nil, false, tc.dropDeletedCols) + dwhTc := types.NewDwhTableConfig(nil, tc.dropDeletedCols) colsToDelete := make(map[string]time.Time) for _, colToDelete := range colsToDeleteList { colsToDelete[colToDelete] = time.Now() diff --git a/lib/destination/types/types_test.go b/lib/destination/types/types_test.go index 00faf8afd..f15e8e517 100644 --- a/lib/destination/types/types_test.go +++ b/lib/destination/types/types_test.go @@ -16,12 +16,12 @@ import ( ) func generateDwhTableCfg() *types.DwhTableConfig { - cols := &columns.Columns{} + var cols []columns.Column for _, col := range []string{"a", "b", "c", "d"} { - cols.AddColumn(columns.NewColumn(col, typing.String)) + cols = append(cols, columns.NewColumn(col, typing.String)) } - tableCfg := types.NewDwhTableConfig(cols, false, false) + tableCfg := types.NewDwhTableConfig(cols, false) colsToDelete := make(map[string]time.Time) for _, col := range []string{"foo", "bar", "abc", "xyz"} { colsToDelete[col] = time.Now() diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 4d29ae82b..d9d5fad78 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -101,6 +101,12 @@ type Columns struct { sync.RWMutex } +func NewColumns(columns []Column) *Columns { + return &Columns{ + columns: columns, + } +} + type UpsertColumnArg struct { ToastCol *bool PrimaryKey *bool