Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Clean.
Browse files Browse the repository at this point in the history
Tang8330 committed Oct 31, 2024
1 parent 60f67da commit 432fd6f
Showing 14 changed files with 70 additions and 71 deletions.
13 changes: 3 additions & 10 deletions clients/shared/table_config.go
Original file line number Diff line number Diff line change
@@ -47,19 +47,17 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) {
}
}()

var tableMissing bool
if err != nil {
if g.Dwh.Dialect().IsTableDoesNotExistErr(err) {
// This branch is currently only used by Snowflake.
// Swallow the error, make sure all the metadata is created
tableMissing = true
err = nil
} else {
return nil, fmt.Errorf("failed to query %T, err: %w, query: %q", g.Dwh, err, g.Query)
}
}

var cols columns.Columns
var cols []columns.Column
for rows != nil && rows.Next() {
// figure out what columns were returned
// the column names will be the JSON object field keys
@@ -126,15 +124,10 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) {
return nil, fmt.Errorf("unknown default value strategy: %q", strategy)
}

cols.AddColumn(col)
cols = append(cols, col)
}

// Do it this way via rows.Next() because that will move the iterator and cause us to miss a column.
if len(cols.GetColumns()) == 0 {
tableMissing = true
}

tableCfg := types.NewDwhTableConfig(&cols, tableMissing, g.DropDeletedColumns)
tableCfg := types.NewDwhTableConfig(cols, g.DropDeletedColumns)
g.ConfigMap.AddTableToConfig(g.TableID, tableCfg)
return tableCfg, nil
}
6 changes: 3 additions & 3 deletions clients/shared/table_config_test.go
Original file line number Diff line number Diff line change
@@ -13,16 +13,16 @@ import (

func TestGetTableConfig(t *testing.T) {
// Return early because table is found in configMap.
cols := &columns.Columns{}
var cols []columns.Column
for i := range 100 {
cols.AddColumn(columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid))
cols = append(cols, columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid))
}

cm := &types.DwhToTablesConfigMap{}
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("dusty_the_mini_aussie")

tableCfg := types.NewDwhTableConfig(cols, false, false)
tableCfg := types.NewDwhTableConfig(cols, false)
cm.AddTableToConfig(fakeTableID, tableCfg)

actualTableCfg, err := GetTableCfgArgs{
18 changes: 9 additions & 9 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
@@ -19,18 +19,18 @@ import (
func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
tableID := NewTableIdentifier("coffee_shop", "public", "orders")

var cols columns.Columns
var cols []columns.Column
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
"customer_id": typing.Integer,
"price": typing.Float,
"name": typing.String,
"created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
} {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
cols = append(cols, columns.NewColumn(colName, kindDetails))
}

s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true))
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(cols, true))

nameCol := columns.NewColumn("name", typing.String)
tc := s.stageStore.configMap.TableConfigCache(tableID)
@@ -46,18 +46,18 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {

func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
tableID := NewTableIdentifier("coffee_shop", "orders", "public")
var cols columns.Columns
var cols []columns.Column
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
"customer_id": typing.Integer,
"price": typing.Float,
"name": typing.String,
"created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
} {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
cols = append(cols, columns.NewColumn(colName, kindDetails))
}

config := types.NewDwhTableConfig(&cols, false, true)
config := types.NewDwhTableConfig(cols, true)
s.stageStore.configMap.AddTableToConfig(tableID, config)

nameCol := columns.NewColumn("name", typing.String)
@@ -85,18 +85,18 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
}

func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() {
var cols columns.Columns
var cols []columns.Column
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
"customer_id": typing.Integer,
"price": typing.Float,
"name": typing.String,
"created_at": typing.MustNewExtendedTimeDetails(typing.ETime, ext.TimestampTZKindType, ""),
} {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
cols = append(cols, columns.NewColumn(colName, kindDetails))
}

tc := types.NewDwhTableConfig(&cols, false, false)
tc := types.NewDwhTableConfig(cols, false)
tc.SetColumnsToDelete(map[string]time.Time{"customer_id": time.Now()})

assert.Equal(s.T(), len(tc.ReadOnlyColumnsToDelete()), 1)
12 changes: 6 additions & 6 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
@@ -72,12 +72,12 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
constants.OnlySetDeleteColumnMarker: typing.Boolean,
}

var anotherCols columns.Columns
var anotherCols []columns.Column
for colName, kindDetails := range anotherColToKindDetailsMap {
anotherCols.AddColumn(columns.NewColumn(colName, kindDetails))
anotherCols = append(anotherCols, columns.NewColumn(colName, kindDetails))
}

s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, false, true))
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(anotherCols, true))

assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
_col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name")
@@ -122,7 +122,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {
tableData.InsertRow(pk, row, false)
}

s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, false, true))
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(cols.GetColumns(), true))
assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount())
}
@@ -170,7 +170,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {

tableID := s.identifierFor(tableData)
fqName := tableID.FullyQualifiedName()
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true))
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), true))
err := s.stageStore.Merge(context.Background(), tableData)
assert.Nil(s.T(), err)
s.fakeStageStore.ExecReturns(nil, nil)
@@ -252,7 +252,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
}

sflkCols.AddColumn(columns.NewColumn("new", typing.String))
_config := types.NewDwhTableConfig(&sflkCols, false, true)
_config := types.NewDwhTableConfig(sflkCols.GetColumns(), true)
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config)

assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
2 changes: 1 addition & 1 deletion clients/snowflake/staging_test.go
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ func generateTableData(rows int) (TableIdentifier, *optimization.TableData) {
func (s *SnowflakeTestSuite) TestPrepareTempTable() {
tempTableID, tableData := generateTableData(10)
tempTableName := tempTableID.FullyQualifiedName()
s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(nil, true))
sflkTc := s.stageStore.GetConfigMap().TableConfigCache(tempTableID)

{
18 changes: 9 additions & 9 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
@@ -44,13 +44,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Testing 3 scenarios here
// 1. DropDeletedColumns = false, ContainOtherOperations = true, don't delete ever.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, false))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), false))
bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, false))
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(cols.GetColumns(), false))
redshiftTc := d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, false))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(cols.GetColumns(), false))
snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID)
// Prior to deletion, there should be no colsToDelete
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
@@ -119,13 +119,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

// 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true))
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID)

// Prior to deletion, there should be no colsToDelete
@@ -193,13 +193,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

// 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true))
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID)

// Prior to deletion, there should be no colsToDelete
8 changes: 4 additions & 4 deletions lib/destination/ddl/ddl_bq_test.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name())
fqName := tableID.FullyQualifiedName()
originalColumnLength := len(cols.GetColumns())
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), true))
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)

// Prior to deletion, there should be no colsToDelete
@@ -119,7 +119,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
existingCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
@@ -177,7 +177,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
existingCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
@@ -235,7 +235,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {

tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name())
originalColumnLength := len(columnNameToKindDetailsMap)
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, false))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), false))
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)

// Prior to deletion, there should be no colsToDelete
6 changes: 3 additions & 3 deletions lib/destination/ddl/ddl_create_table_test.go
Original file line number Diff line number Diff line change
@@ -21,10 +21,10 @@ import (

func (d *DDLTestSuite) Test_CreateTable() {
bqTableID := bigquery.NewTableIdentifier("", "mock_dataset", "mock_table")
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(nil, true))

snowflakeTableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(nil, true))

type dwhToTableConfig struct {
_tableID sql.TableIdentifier
@@ -115,7 +115,7 @@ func (d *DDLTestSuite) TestCreateTable() {

for index, testCase := range testCases {
tableID := snowflake.NewTableIdentifier("demo", "public", "experiments")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

alterTableArgs := ddl.AlterTableArgs{
10 changes: 5 additions & 5 deletions lib/destination/ddl/ddl_sflk_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "complex_columns")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

alterTableArgs := ddl.AlterTableArgs{
@@ -58,7 +58,7 @@ func (d *DDLTestSuite) TestAlterIdempotency() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "orders")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists"))
@@ -88,7 +88,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "orders")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

alterTableArgs := ddl.AlterTableArgs{
@@ -130,7 +130,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "users")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
@@ -190,7 +190,7 @@ func (d *DDLTestSuite) TestAlterTableDelete() {

tableID := snowflake.NewTableIdentifier("shop", "public", "users1")

tableCfg := types.NewDwhTableConfig(&columns.Columns{}, false, true)
tableCfg := types.NewDwhTableConfig(nil, true)
tableCfg.SetColumnsToDelete(map[string]time.Time{
"col_to_delete": time.Now().Add(-2 * constants.DeletionConfidencePadding),
"answers": time.Now().Add(-2 * constants.DeletionConfidencePadding),
8 changes: 4 additions & 4 deletions lib/destination/ddl/ddl_temp_test.go
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ func (d *DDLTestSuite) TestValidate_AlterTableArgs() {

func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() {
tableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
args := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
@@ -55,7 +55,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() {
assert.ErrorContains(d.T(), args.AlterTable(d.snowflakeStagesStore), "incompatible operation - cannot drop columns and create table at the same time")

// Change it to SFLK + Stage
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
snowflakeStagesTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
args.Dialect = d.snowflakeStagesStore.Dialect()
args.Tc = snowflakeStagesTc
@@ -69,7 +69,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() {
// Snowflake Stage
tableID := snowflake.NewTableIdentifier("db", "schema", "tempTableName")

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
sflkStageTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
args := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
@@ -94,7 +94,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() {
{
// BigQuery
tableID := bigquery.NewTableIdentifier("db", "schema", "tempTableName")
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
args := ddl.AlterTableArgs{
Dialect: d.bigQueryStore.Dialect(),
6 changes: 3 additions & 3 deletions lib/destination/types/table_config.go
Original file line number Diff line number Diff line change
@@ -21,11 +21,11 @@ type DwhTableConfig struct {
sync.RWMutex
}

func NewDwhTableConfig(columns *columns.Columns, createTable, dropDeletedColumns bool) *DwhTableConfig {
func NewDwhTableConfig(cols []columns.Column, dropDeletedColumns bool) *DwhTableConfig {
return &DwhTableConfig{
columns: columns,
columns: columns.NewColumns(cols),
columnsToDelete: make(map[string]time.Time),
createTable: createTable,
createTable: len(cols) == 0,
dropDeletedColumns: dropDeletedColumns,
}
}
22 changes: 11 additions & 11 deletions lib/destination/types/table_config_test.go
Original file line number Diff line number Diff line change
@@ -20,23 +20,23 @@ import (
func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) {
// Test 3 different possibilities:
// 1. DropDeletedColumns = false, so don't delete.
dwhTableConfig := types.NewDwhTableConfig(&columns.Columns{}, false, false)
dwhTableConfig := types.NewDwhTableConfig(nil, false)
for i := 0; i < 100; i++ {
results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), true)
assert.False(t, results)
assert.Equal(t, len(dwhTableConfig.ReadOnlyColumnsToDelete()), 0)
}

// 2. DropDeletedColumns = true and ContainsOtherOperations = false, so don't delete
dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, false, true)
dwhTableConfig = types.NewDwhTableConfig(nil, true)
for i := 0; i < 100; i++ {
results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), false)
assert.False(t, results)
assert.Equal(t, len(dwhTableConfig.ReadOnlyColumnsToDelete()), 0)
}

// 3. DropDeletedColumns = true and ContainsOtherOperations = true, now check CDC time to delete.
dwhTableConfig = types.NewDwhTableConfig(&columns.Columns{}, false, true)
dwhTableConfig = types.NewDwhTableConfig(nil, true)
for i := 0; i < 100; i++ {
results := dwhTableConfig.ShouldDeleteColumn("hello", time.Now().UTC(), true)
assert.False(t, results)
@@ -49,12 +49,12 @@ func TestDwhTableConfig_ShouldDeleteColumn(t *testing.T) {
// TestDwhTableConfig_ColumnsConcurrency this file is meant to test the concurrency methods of .Columns()
// In this test, we spin up 5 parallel Go-routines each making 100 calls to .Columns() and assert the validity of the data.
func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) {
var cols columns.Columns
cols.AddColumn(columns.NewColumn("foo", typing.Struct))
cols.AddColumn(columns.NewColumn("bar", typing.String))
cols.AddColumn(columns.NewColumn("boolean", typing.Boolean))
var cols []columns.Column
cols = append(cols, columns.NewColumn("foo", typing.Struct))
cols = append(cols, columns.NewColumn("bar", typing.String))
cols = append(cols, columns.NewColumn("boolean", typing.Boolean))

dwhTableCfg := types.NewDwhTableConfig(&cols, false, false)
dwhTableCfg := types.NewDwhTableConfig(cols, false)

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
@@ -78,7 +78,7 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) {
}

func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) {
tc := types.NewDwhTableConfig(&columns.Columns{}, false, false)
tc := types.NewDwhTableConfig(nil, false)
for _, col := range []string{"a", "b", "c", "d", "e"} {
tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String))
}
@@ -106,7 +106,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) {
}

func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) {
tc := types.NewDwhTableConfig(nil, false, false)
tc := types.NewDwhTableConfig(nil, false)
colsToDelete := make(map[string]time.Time)
for _, colToDelete := range []string{"a", "b", "c", "d"} {
colsToDelete[colToDelete] = time.Now()
@@ -154,7 +154,7 @@ func TestAuditColumnsToDelete(t *testing.T) {
}

for idx, tc := range tcs {
dwhTc := types.NewDwhTableConfig(nil, false, tc.dropDeletedCols)
dwhTc := types.NewDwhTableConfig(nil, tc.dropDeletedCols)
colsToDelete := make(map[string]time.Time)
for _, colToDelete := range colsToDeleteList {
colsToDelete[colToDelete] = time.Now()
6 changes: 3 additions & 3 deletions lib/destination/types/types_test.go
Original file line number Diff line number Diff line change
@@ -16,12 +16,12 @@ import (
)

func generateDwhTableCfg() *types.DwhTableConfig {
cols := &columns.Columns{}
var cols []columns.Column
for _, col := range []string{"a", "b", "c", "d"} {
cols.AddColumn(columns.NewColumn(col, typing.String))
cols = append(cols, columns.NewColumn(col, typing.String))
}

tableCfg := types.NewDwhTableConfig(cols, false, false)
tableCfg := types.NewDwhTableConfig(cols, false)
colsToDelete := make(map[string]time.Time)
for _, col := range []string{"foo", "bar", "abc", "xyz"} {
colsToDelete[col] = time.Now()
6 changes: 6 additions & 0 deletions lib/typing/columns/columns.go
Original file line number Diff line number Diff line change
@@ -101,6 +101,12 @@ type Columns struct {
sync.RWMutex
}

func NewColumns(columns []Column) *Columns {
return &Columns{
columns: columns,
}
}

type UpsertColumnArg struct {
ToastCol *bool
PrimaryKey *bool

0 comments on commit 432fd6f

Please sign in to comment.