diff --git a/clients/shared/table_config.go b/clients/shared/table_config.go index 03698150b..5a71b8ef9 100644 --- a/clients/shared/table_config.go +++ b/clients/shared/table_config.go @@ -134,7 +134,7 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) { tableMissing = true } - tableCfg := types.NewDwhTableConfig(&cols, nil, tableMissing, g.DropDeletedColumns) + tableCfg := types.NewDwhTableConfig(&cols, tableMissing, g.DropDeletedColumns) g.ConfigMap.AddTableToConfig(g.TableID, tableCfg) return tableCfg, nil } diff --git a/clients/shared/table_config_test.go b/clients/shared/table_config_test.go index afbe2c56c..e896c4a01 100644 --- a/clients/shared/table_config_test.go +++ b/clients/shared/table_config_test.go @@ -18,11 +18,12 @@ func TestGetTableConfig(t *testing.T) { cols.AddColumn(columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid)) } - dwhTableCfg := types.NewDwhTableConfig(cols, nil, false, false) cm := &types.DwhToTablesConfigMap{} fakeTableID := &mocks.FakeTableIdentifier{} fakeTableID.FullyQualifiedNameReturns("dusty_the_mini_aussie") - cm.AddTableToConfig(fakeTableID, dwhTableCfg) + + tableCfg := types.NewDwhTableConfig(cols, false, false) + cm.AddTableToConfig(fakeTableID, tableCfg) actualTableCfg, err := GetTableCfgArgs{ Dwh: &mocks.FakeDataWarehouse{}, @@ -31,5 +32,5 @@ func TestGetTableConfig(t *testing.T) { }.GetTableConfig() assert.NoError(t, err) - assert.Equal(t, dwhTableCfg, actualTableCfg) + assert.Equal(t, tableCfg, actualTableCfg) } diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index 62fff39d7..efb64153c 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -4,20 +4,16 @@ import ( "fmt" "time" - "github.com/artie-labs/transfer/lib/config" + "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" - + "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/stretchr/testify/assert" - - "github.com/artie-labs/transfer/lib/typing" ) func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { @@ -34,9 +30,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } - config := types.NewDwhTableConfig(&cols, nil, false, true) - - s.stageStore.configMap.AddTableToConfig(tableID, config) + s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true)) nameCol := columns.NewColumn("name", typing.String) tc := s.stageStore.configMap.TableConfigCache(tableID) @@ -63,7 +57,7 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } - config := types.NewDwhTableConfig(&cols, nil, false, true) + config := types.NewDwhTableConfig(&cols, false, true) s.stageStore.configMap.AddTableToConfig(tableID, config) nameCol := columns.NewColumn("name", typing.String) @@ -102,9 +96,8 @@ func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() { cols.AddColumn(columns.NewColumn(colName, kindDetails)) } - tc := types.NewDwhTableConfig(&cols, map[string]time.Time{ - "customer_id": time.Now(), - }, false, false) + tc := types.NewDwhTableConfig(&cols, false, false) + tc.SetColumnsToDelete(map[string]time.Time{"customer_id": time.Now()}) assert.Equal(s.T(), len(tc.ReadOnlyColumnsToDelete()), 1) assert.False(s.T(), tc.ShouldDeleteColumn("customer_id", diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 06dfed4ad..3c0f25dfb 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -77,7 +77,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() { anotherCols.AddColumn(columns.NewColumn(colName, kindDetails)) } - s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true)) + s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, false, true)) assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) _col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name") @@ -122,7 +122,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { tableData.InsertRow(pk, row, false) } - s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true)) + s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, false, true)) assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount()) } @@ -170,7 +170,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() { tableID := s.identifierFor(tableData) fqName := tableID.FullyQualifiedName() - s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true)) + s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true)) err := s.stageStore.Merge(context.Background(), tableData) assert.Nil(s.T(), err) s.fakeStageStore.ExecReturns(nil, nil) @@ -252,7 +252,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { } sflkCols.AddColumn(columns.NewColumn("new", typing.String)) - _config := types.NewDwhTableConfig(&sflkCols, nil, false, true) + _config := types.NewDwhTableConfig(&sflkCols, false, true) s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config) assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 37502ddfd..90181278f 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -153,7 +153,7 @@ func generateTableData(rows int) (TableIdentifier, *optimization.TableData) { func (s *SnowflakeTestSuite) TestPrepareTempTable() { tempTableID, tableData := generateTableData(10) tempTableName := tempTableID.FullyQualifiedName() - s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) sflkTc := s.stageStore.GetConfigMap().TableConfigCache(tempTableID) { diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index c7aca2c7b..db062fb4d 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -5,8 +5,9 @@ import ( "strings" "time" - "github.com/artie-labs/transfer/lib/config" + "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" @@ -14,7 +15,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/stretchr/testify/assert" ) func (d *DDLTestSuite) TestAlterDelete_Complete() { @@ -44,13 +44,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Testing 3 scenarios here // 1. DropDeletedColumns = false, ContainOtherOperations = true, don't delete ever. - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, false)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, false)) bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID) - d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, false)) + d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, false)) redshiftTc := d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID) - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, false)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, false)) snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID) // Prior to deletion, there should be no colsToDelete assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete()) @@ -119,13 +119,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) // 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true)) bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID) - d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true)) redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID) - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true)) snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID) // Prior to deletion, there should be no colsToDelete @@ -193,13 +193,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) // 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp. - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true)) bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID) - d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true)) redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID) - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true)) snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID) // Prior to deletion, there should be no colsToDelete diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 8ae054065..db6748896 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -6,19 +6,17 @@ import ( "fmt" "time" - "github.com/artie-labs/transfer/clients/bigquery" - "github.com/artie-labs/transfer/lib/config" - - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/clients/bigquery" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" ) func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { @@ -45,7 +43,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name()) fqName := tableID.FullyQualifiedName() originalColumnLength := len(cols.GetColumns()) - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true)) tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) // Prior to deletion, there should be no colsToDelete @@ -121,7 +119,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { existingCols.AddColumn(columns.NewColumn(colName, kindDetails)) } - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, nil, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true)) // Prior to adding, there should be no colsToDelete assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) @@ -179,7 +177,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { existingCols.AddColumn(columns.NewColumn(colName, kindDetails)) } - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, nil, false, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true)) // Prior to adding, there should be no colsToDelete assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) @@ -237,7 +235,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name()) originalColumnLength := len(columnNameToKindDetailsMap) - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, false)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, false)) tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) // Prior to deletion, there should be no colsToDelete diff --git a/lib/destination/ddl/ddl_create_table_test.go b/lib/destination/ddl/ddl_create_table_test.go index 54a1422cf..a665ec47c 100644 --- a/lib/destination/ddl/ddl_create_table_test.go +++ b/lib/destination/ddl/ddl_create_table_test.go @@ -21,10 +21,10 @@ import ( func (d *DDLTestSuite) Test_CreateTable() { bqTableID := bigquery.NewTableIdentifier("", "mock_dataset", "mock_table") - d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) snowflakeTableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) type dwhToTableConfig struct { _tableID sql.TableIdentifier @@ -115,7 +115,7 @@ func (d *DDLTestSuite) TestCreateTable() { for index, testCase := range testCases { tableID := snowflake.NewTableIdentifier("demo", "public", "experiments") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 572aedacd..3f435eb7b 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -26,7 +26,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { } tableID := snowflake.NewTableIdentifier("shop", "public", "complex_columns") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ @@ -58,7 +58,7 @@ func (d *DDLTestSuite) TestAlterIdempotency() { } tableID := snowflake.NewTableIdentifier("shop", "public", "orders") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists")) @@ -88,7 +88,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { } tableID := snowflake.NewTableIdentifier("shop", "public", "orders") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ @@ -130,7 +130,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { } tableID := snowflake.NewTableIdentifier("shop", "public", "users") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ Dialect: d.snowflakeStagesStore.Dialect(), @@ -190,11 +190,14 @@ func (d *DDLTestSuite) TestAlterTableDelete() { tableID := snowflake.NewTableIdentifier("shop", "public", "users1") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, map[string]time.Time{ + tableCfg := types.NewDwhTableConfig(&columns.Columns{}, false, true) + tableCfg.SetColumnsToDelete(map[string]time.Time{ "col_to_delete": time.Now().Add(-2 * constants.DeletionConfidencePadding), "answers": time.Now().Add(-2 * constants.DeletionConfidencePadding), "start": time.Now().Add(-2 * constants.DeletionConfidencePadding), - }, false, true)) + }) + + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, tableCfg) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) alterTableArgs := ddl.AlterTableArgs{ diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index 516f573ef..7a582f81d 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -35,7 +35,7 @@ func (d *DDLTestSuite) TestValidate_AlterTableArgs() { func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { tableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) args := ddl.AlterTableArgs{ Dialect: d.snowflakeStagesStore.Dialect(), @@ -55,7 +55,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() { assert.ErrorContains(d.T(), args.AlterTable(d.snowflakeStagesStore), "incompatible operation - cannot drop columns and create table at the same time") // Change it to SFLK + Stage - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) snowflakeStagesTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) args.Dialect = d.snowflakeStagesStore.Dialect() args.Tc = snowflakeStagesTc @@ -69,7 +69,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { // Snowflake Stage tableID := snowflake.NewTableIdentifier("db", "schema", "tempTableName") - d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) sflkStageTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) args := ddl.AlterTableArgs{ Dialect: d.snowflakeStagesStore.Dialect(), @@ -94,7 +94,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { { // BigQuery tableID := bigquery.NewTableIdentifier("db", "schema", "tempTableName") - d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true)) + d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true)) bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) args := ddl.AlterTableArgs{ Dialect: d.bigQueryStore.Dialect(), diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index 79d6515a0..cfec85fea 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -21,19 +21,23 @@ type DwhTableConfig struct { sync.RWMutex } -func NewDwhTableConfig(columns *columns.Columns, colsToDelete map[string]time.Time, createTable, dropDeletedColumns bool) *DwhTableConfig { - if len(colsToDelete) == 0 { - colsToDelete = make(map[string]time.Time) - } - +func NewDwhTableConfig(columns *columns.Columns, createTable, dropDeletedColumns bool) *DwhTableConfig { return &DwhTableConfig{ columns: columns, - columnsToDelete: colsToDelete, + columnsToDelete: make(map[string]time.Time), createTable: createTable, dropDeletedColumns: dropDeletedColumns, } } +// SetColumnsToDelete - This is used by tests only +func (d *DwhTableConfig) SetColumnsToDelete(cols map[string]time.Time) { + d.Lock() + defer d.Unlock() + + d.columnsToDelete = cols +} + func (d *DwhTableConfig) CreateTable() bool { d.RLock() defer d.RUnlock() diff --git a/lib/destination/types/table_config_test.go b/lib/destination/types/table_config_test.go index 00562d74e..efd331f0b 100644 --- a/lib/destination/types/table_config_test.go +++ b/lib/destination/types/table_config_test.go @@ -20,7 +20,7 @@ import ( func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { // Test 3 different possibilities: // 1. DropDeletedColumns = false, so don't delete. - dwhTableConfig := types.NewDwhTableConfig(&columns.Columns{}, nil, false, false) + dwhTableConfig := types.NewDwhTableConfig(&columns.Columns{}, false, false) for i := 0; i < 100; i++ { results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), true) assert.False(t, results) @@ -28,7 +28,7 @@ func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { } // 2. DropDeletedColumns = true and ContainsOtherOperations = false, so don't delete - dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, nil, false, true) + dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, false, true) for i := 0; i < 100; i++ { results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), false) assert.False(t, results) @@ -36,7 +36,7 @@ func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) { } // 3. DropDeletedColumns = true and ContainsOtherOperations = true, now check CDC time to delete. - dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, nil, false, true) + dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, false, true) for i := 0; i < 100; i++ { results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), true) assert.False(t, results) @@ -54,7 +54,7 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) { cols.AddColumn(columns.NewColumn("bar", typing.String)) cols.AddColumn(columns.NewColumn("boolean", typing.Boolean)) - dwhTableCfg := types.NewDwhTableConfig(&cols, nil, false, false) + dwhTableCfg := types.NewDwhTableConfig(&cols, false, false) var wg sync.WaitGroup for i := 0; i < 5; i++ { @@ -78,7 +78,7 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) { } func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { - tc := types.NewDwhTableConfig(&columns.Columns{}, nil, false, false) + tc := types.NewDwhTableConfig(&columns.Columns{}, false, false) for _, col := range []string{"a", "b", "c", "d", "e"} { tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String)) } @@ -106,12 +106,14 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { } func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) { + tc := types.NewDwhTableConfig(nil, false, false) colsToDelete := make(map[string]time.Time) for _, colToDelete := range []string{"a", "b", "c", "d"} { colsToDelete[colToDelete] = time.Now() } - tc := types.NewDwhTableConfig(nil, colsToDelete, false, false) + tc.SetColumnsToDelete(colsToDelete) + var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) @@ -152,12 +154,14 @@ func TestAuditColumnsToDelete(t *testing.T) { } for idx, tc := range tcs { + dwhTc := types.NewDwhTableConfig(nil, false, tc.dropDeletedCols) colsToDelete := make(map[string]time.Time) for _, colToDelete := range colsToDeleteList { colsToDelete[colToDelete] = time.Now() } - dwhTc := types.NewDwhTableConfig(nil, colsToDelete, false, tc.dropDeletedCols) + dwhTc.SetColumnsToDelete(colsToDelete) + var cols []columns.Column for _, colToDelete := range tc.colsToDelete { cols = append(cols, columns.NewColumn(colToDelete, typing.String)) diff --git a/lib/destination/types/types_test.go b/lib/destination/types/types_test.go index 4e2e614f0..00faf8afd 100644 --- a/lib/destination/types/types_test.go +++ b/lib/destination/types/types_test.go @@ -17,16 +17,18 @@ import ( func generateDwhTableCfg() *types.DwhTableConfig { cols := &columns.Columns{} + for _, col := range []string{"a", "b", "c", "d"} { + cols.AddColumn(columns.NewColumn(col, typing.String)) + } + + tableCfg := types.NewDwhTableConfig(cols, false, false) colsToDelete := make(map[string]time.Time) for _, col := range []string{"foo", "bar", "abc", "xyz"} { colsToDelete[col] = time.Now() } - for _, col := range []string{"a", "b", "c", "d"} { - cols.AddColumn(columns.NewColumn(col, typing.String)) - } - - return types.NewDwhTableConfig(cols, colsToDelete, false, false) + tableCfg.SetColumnsToDelete(colsToDelete) + return tableCfg } func TestDwhToTablesConfigMap_TableConfigBasic(t *testing.T) {