diff --git a/clients/shared/append.go b/clients/shared/append.go index 660efac99..994f4840a 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -33,7 +33,6 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim ) tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - if tableConfig.CreateTable() { if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil { return fmt.Errorf("failed to create table: %w", err) diff --git a/clients/shared/table.go b/clients/shared/table.go index 52c707ae7..79b711fc9 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -25,6 +25,6 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData * } // Update cache with the new columns that we've added. - tc.MutateInMemoryColumns(false, constants.Add, tableData.ReadOnlyInMemoryCols().GetColumns()...) + tc.MutateInMemoryColumns(constants.Add, tableData.ReadOnlyInMemoryCols().GetColumns()...) return nil } diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index b9cd38ea5..ba55759b2 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -39,7 +39,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() { assert.Equal(s.T(), len(s.stageStore.configMap.TableConfigCache(tableID).ReadOnlyColumnsToDelete()), 1) // Now let's try to add this column back, it should delete it from the cache. - tc.MutateInMemoryColumns(false, constants.Add, nameCol) + tc.MutateInMemoryColumns(constants.Add, nameCol) assert.Equal(s.T(), len(s.stageStore.configMap.TableConfigCache(tableID).ReadOnlyColumnsToDelete()), 0) } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index e3c3075f7..dbada5cb6 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -149,7 +149,7 @@ func (a AlterTableArgs) AlterTable(dwh destination.DataWarehouse, cols ...column } // createTable = false since it all successfully updated. - a.Tc.MutateInMemoryColumns(false, a.ColumnOp, mutateCol...) + a.Tc.MutateInMemoryColumns(a.ColumnOp, mutateCol...) return nil } diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index d517a47c3..1c11142bd 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -67,7 +67,7 @@ func (d *DwhTableConfig) Columns() *columns.Columns { return d.columns } -func (d *DwhTableConfig) MutateInMemoryColumns(createTable bool, columnOp constants.ColumnOperation, cols ...columns.Column) { +func (d *DwhTableConfig) MutateInMemoryColumns(columnOp constants.ColumnOperation, cols ...columns.Column) { d.Lock() defer d.Unlock() switch columnOp { @@ -78,7 +78,8 @@ func (d *DwhTableConfig) MutateInMemoryColumns(createTable bool, columnOp consta delete(d.columnsToDelete, col.Name()) } - d.createTable = createTable + // If we're adding columns, then the table should have either been created or already exists. + d.createTable = false case constants.Delete: for _, col := range cols { // Delete from the permissions and in-memory table diff --git a/lib/destination/types/table_config_test.go b/lib/destination/types/table_config_test.go index 01b9f2ec3..58e309657 100644 --- a/lib/destination/types/table_config_test.go +++ b/lib/destination/types/table_config_test.go @@ -82,7 +82,7 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) { func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { tc := types.NewDwhTableConfig(nil, false) for _, col := range []string{"a", "b", "c", "d", "e"} { - tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String)) + tc.MutateInMemoryColumns(constants.Add, columns.NewColumn(col, typing.String)) } assert.Len(t, tc.GetColumns(), 5) @@ -91,7 +91,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { wg.Add(1) go func(colName string) { defer wg.Done() - tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(colName, typing.String)) + tc.MutateInMemoryColumns(constants.Add, columns.NewColumn(colName, typing.String)) }(addCol) } @@ -99,7 +99,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { wg.Add(1) go func(colName string) { defer wg.Done() - tc.MutateInMemoryColumns(false, constants.Delete, columns.NewColumn(colName, typing.Invalid)) + tc.MutateInMemoryColumns(constants.Delete, columns.NewColumn(colName, typing.Invalid)) }(removeCol) }