Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 31, 2024
1 parent b67409b commit 60f67da
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 76 deletions.
2 changes: 1 addition & 1 deletion clients/shared/table_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) {
tableMissing = true
}

tableCfg := types.NewDwhTableConfig(&cols, nil, tableMissing, g.DropDeletedColumns)
tableCfg := types.NewDwhTableConfig(&cols, tableMissing, g.DropDeletedColumns)
g.ConfigMap.AddTableToConfig(g.TableID, tableCfg)
return tableCfg, nil
}
7 changes: 4 additions & 3 deletions clients/shared/table_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ func TestGetTableConfig(t *testing.T) {
cols.AddColumn(columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid))
}

dwhTableCfg := types.NewDwhTableConfig(cols, nil, false, false)
cm := &types.DwhToTablesConfigMap{}
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("dusty_the_mini_aussie")
cm.AddTableToConfig(fakeTableID, dwhTableCfg)

tableCfg := types.NewDwhTableConfig(cols, false, false)
cm.AddTableToConfig(fakeTableID, tableCfg)

actualTableCfg, err := GetTableCfgArgs{
Dwh: &mocks.FakeDataWarehouse{},
Expand All @@ -31,5 +32,5 @@ func TestGetTableConfig(t *testing.T) {
}.GetTableConfig()

assert.NoError(t, err)
assert.Equal(t, dwhTableCfg, actualTableCfg)
assert.Equal(t, tableCfg, actualTableCfg)
}
25 changes: 9 additions & 16 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@ import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/config"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/typing"
)

func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
Expand All @@ -34,9 +30,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
}

config := types.NewDwhTableConfig(&cols, nil, false, true)

s.stageStore.configMap.AddTableToConfig(tableID, config)
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true))

nameCol := columns.NewColumn("name", typing.String)
tc := s.stageStore.configMap.TableConfigCache(tableID)
Expand All @@ -63,7 +57,7 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
}

config := types.NewDwhTableConfig(&cols, nil, false, true)
config := types.NewDwhTableConfig(&cols, false, true)
s.stageStore.configMap.AddTableToConfig(tableID, config)

nameCol := columns.NewColumn("name", typing.String)
Expand Down Expand Up @@ -102,9 +96,8 @@ func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
}

tc := types.NewDwhTableConfig(&cols, map[string]time.Time{
"customer_id": time.Now(),
}, false, false)
tc := types.NewDwhTableConfig(&cols, false, false)
tc.SetColumnsToDelete(map[string]time.Time{"customer_id": time.Now()})

assert.Equal(s.T(), len(tc.ReadOnlyColumnsToDelete()), 1)
assert.False(s.T(), tc.ShouldDeleteColumn("customer_id",
Expand Down
8 changes: 4 additions & 4 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
anotherCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true))
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, false, true))

assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
_col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name")
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {
tableData.InsertRow(pk, row, false)
}

s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true))
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, false, true))
assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount())
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {

tableID := s.identifierFor(tableData)
fqName := tableID.FullyQualifiedName()
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true))
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true))
err := s.stageStore.Merge(context.Background(), tableData)
assert.Nil(s.T(), err)
s.fakeStageStore.ExecReturns(nil, nil)
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
}

sflkCols.AddColumn(columns.NewColumn("new", typing.String))
_config := types.NewDwhTableConfig(&sflkCols, nil, false, true)
_config := types.NewDwhTableConfig(&sflkCols, false, true)
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config)

assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData))
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func generateTableData(rows int) (TableIdentifier, *optimization.TableData) {
func (s *SnowflakeTestSuite) TestPrepareTempTable() {
tempTableID, tableData := generateTableData(10)
tempTableName := tempTableID.FullyQualifiedName()
s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true))
s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
sflkTc := s.stageStore.GetConfigMap().TableConfigCache(tempTableID)

{
Expand Down
22 changes: 11 additions & 11 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/config"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"
)

func (d *DDLTestSuite) TestAlterDelete_Complete() {
Expand Down Expand Up @@ -44,13 +44,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Testing 3 scenarios here
// 1. DropDeletedColumns = false, ContainOtherOperations = true, don't delete ever.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, false))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, false))
bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, false))
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, false))
redshiftTc := d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, false))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, false))
snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID)
// Prior to deletion, there should be no colsToDelete
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
Expand Down Expand Up @@ -119,13 +119,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

// 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID)

// Prior to deletion, there should be no colsToDelete
Expand Down Expand Up @@ -193,13 +193,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

// 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, false, true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfigCache(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, false, true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfigCache(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, false, true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfigCache(snowflakeTableID)

// Prior to deletion, there should be no colsToDelete
Expand Down
16 changes: 7 additions & 9 deletions lib/destination/ddl/ddl_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ import (
"fmt"
"time"

"github.com/artie-labs/transfer/clients/bigquery"
"github.com/artie-labs/transfer/lib/config"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/clients/bigquery"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
Expand All @@ -45,7 +43,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name())
fqName := tableID.FullyQualifiedName()
originalColumnLength := len(cols.GetColumns())
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, true))
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)

// Prior to deletion, there should be no colsToDelete
Expand Down Expand Up @@ -121,7 +119,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
existingCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, nil, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
Expand Down Expand Up @@ -179,7 +177,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
existingCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, nil, false, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&existingCols, false, true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
Expand Down Expand Up @@ -237,7 +235,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {

tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name())
originalColumnLength := len(columnNameToKindDetailsMap)
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, false))
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, false, false))
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)

// Prior to deletion, there should be no colsToDelete
Expand Down
6 changes: 3 additions & 3 deletions lib/destination/ddl/ddl_create_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (

func (d *DDLTestSuite) Test_CreateTable() {
bqTableID := bigquery.NewTableIdentifier("", "mock_dataset", "mock_table")
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true))
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))

snowflakeTableID := snowflake.NewTableIdentifier("", "mock_dataset", "mock_table")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))

type dwhToTableConfig struct {
_tableID sql.TableIdentifier
Expand Down Expand Up @@ -115,7 +115,7 @@ func (d *DDLTestSuite) TestCreateTable() {

for index, testCase := range testCases {
tableID := snowflake.NewTableIdentifier("demo", "public", "experiments")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, true, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

alterTableArgs := ddl.AlterTableArgs{
Expand Down
15 changes: 9 additions & 6 deletions lib/destination/ddl/ddl_sflk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "complex_columns")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

alterTableArgs := ddl.AlterTableArgs{
Expand Down Expand Up @@ -58,7 +58,7 @@ func (d *DDLTestSuite) TestAlterIdempotency() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "orders")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists"))
Expand Down Expand Up @@ -88,7 +88,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "orders")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

alterTableArgs := ddl.AlterTableArgs{
Expand Down Expand Up @@ -130,7 +130,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {
}

tableID := snowflake.NewTableIdentifier("shop", "public", "users")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, nil, false, true))
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, false, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
Expand Down Expand Up @@ -190,11 +190,14 @@ func (d *DDLTestSuite) TestAlterTableDelete() {

tableID := snowflake.NewTableIdentifier("shop", "public", "users1")

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(&columns.Columns{}, map[string]time.Time{
tableCfg := types.NewDwhTableConfig(&columns.Columns{}, false, true)
tableCfg.SetColumnsToDelete(map[string]time.Time{
"col_to_delete": time.Now().Add(-2 * constants.DeletionConfidencePadding),
"answers": time.Now().Add(-2 * constants.DeletionConfidencePadding),
"start": time.Now().Add(-2 * constants.DeletionConfidencePadding),
}, false, true))
})

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, tableCfg)

tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
alterTableArgs := ddl.AlterTableArgs{
Expand Down
Loading

0 comments on commit 60f67da

Please sign in to comment.