Skip to content

Commit

Permalink
Pass TableIdentifier to TableConfig()
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Apr 21, 2024
1 parent bb34170 commit d431c64
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 157 deletions.
6 changes: 2 additions & 4 deletions clients/shared/table_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ func (g GetTableCfgArgs) ShouldParseComment(comment string) bool {
}

func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) {
fqName := g.TableID.FullyQualifiedName()

// Check if it already exists in cache
tableConfig := g.ConfigMap.TableConfig(fqName)
tableConfig := g.ConfigMap.TableConfig(g.TableID)
if tableConfig != nil {
return tableConfig, nil
}
Expand Down Expand Up @@ -134,6 +132,6 @@ func (g GetTableCfgArgs) GetTableConfig() (*types.DwhTableConfig, error) {
}

tableCfg := types.NewDwhTableConfig(&cols, nil, tableMissing, g.DropDeletedColumns)
g.ConfigMap.AddTableToConfig(fqName, tableCfg)
g.ConfigMap.AddTableToConfig(g.TableID, tableCfg)
return tableCfg, nil
}
6 changes: 3 additions & 3 deletions clients/shared/table_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ func TestGetTableConfig(t *testing.T) {
cols.AddColumn(columns.NewColumn(fmt.Sprintf("col-%v", i), typing.Invalid))
}

fqName := "dusty_the_mini_aussie"
tableID := MockTableIdentifier{"dusty_the_mini_aussie"}
dwhTableCfg := types.NewDwhTableConfig(cols, nil, false, false)

cm := &types.DwhToTablesConfigMap{}
cm.AddTableToConfig(fqName, dwhTableCfg)
cm.AddTableToConfig(tableID, dwhTableCfg)

actualTableCfg, err := GetTableCfgArgs{
Dwh: MockDWH{},
TableID: MockTableIdentifier{fqName},
TableID: tableID,
ConfigMap: cm,
}.GetTableConfig()

Expand Down
22 changes: 11 additions & 11 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
fqName := "coffee_shop.public.orders"
tableID := NewTableIdentifier("coffee_shop", "public", "orders", true)

var cols columns.Columns
for colName, kindDetails := range map[string]typing.KindDetails{
Expand All @@ -36,22 +36,22 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {

config := types.NewDwhTableConfig(&cols, nil, false, true)

s.stageStore.configMap.AddTableToConfig(fqName, config)
s.stageStore.configMap.AddTableToConfig(tableID, config)

nameCol := columns.NewColumn("name", typing.String)
tc := s.stageStore.configMap.TableConfig(fqName)
tc := s.stageStore.configMap.TableConfig(tableID)

val := tc.ShouldDeleteColumn(nameCol.RawName(), time.Now().Add(-1*6*time.Hour), true)
assert.False(s.T(), val, "should not try to delete this column")
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(fqName).ReadOnlyColumnsToDelete()), 1)
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableID).ReadOnlyColumnsToDelete()), 1)

// Now let's try to add this column back, it should delete it from the cache.
tc.MutateInMemoryColumns(false, constants.Add, nameCol)
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(fqName).ReadOnlyColumnsToDelete()), 0)
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableID).ReadOnlyColumnsToDelete()), 0)
}

func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
fqName := "coffee_shop.orders.public"
tableID := NewTableIdentifier("coffee_shop", "orders", "public", true)
var cols columns.Columns
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
Expand All @@ -64,27 +64,27 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
}

config := types.NewDwhTableConfig(&cols, nil, false, true)
s.stageStore.configMap.AddTableToConfig(fqName, config)
s.stageStore.configMap.AddTableToConfig(tableID, config)

nameCol := columns.NewColumn("name", typing.String)
// Let's try to delete name.
allowed := s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.RawName(),
allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
time.Now().Add(-1*(6*time.Hour)), true)

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process tried to delete, but it's lagged.
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.RawName(),
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
time.Now().Add(-1*(6*time.Hour)), true)

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process now caught up, and is asking if we can delete, should still be no.
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.RawName(), time.Now(), true)
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), time.Now(), true)
assert.Equal(s.T(), allowed, false, "should not be allowed to delete still")

// Process is finally ahead, has permission to delete now.
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.RawName(),
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
time.Now().Add(2*constants.DeletionConfidencePadding), true)

assert.Equal(s.T(), allowed, true, "should now be allowed to delete")
Expand Down
26 changes: 13 additions & 13 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
"github.com/artie-labs/transfer/lib/typing/ext"
)

func (s *SnowflakeTestSuite) fullyQualifiedName(tableData *optimization.TableData) string {
tableID := s.stageStore.IdentifierFor(tableData.TopicConfig(), tableData.Name())
return tableID.FullyQualifiedName()
func (s *SnowflakeTestSuite) identifierFor(tableData *optimization.TableData) types.TableIdentifier {
return s.stageStore.IdentifierFor(tableData.TopicConfig(), tableData.Name())
}

func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
Expand Down Expand Up @@ -75,7 +74,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
anotherCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

s.stageStore.configMap.AddTableToConfig(s.fullyQualifiedName(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true))
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true))

err := s.stageStore.Merge(tableData)
_col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name")
Expand Down Expand Up @@ -120,7 +119,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {
tableData.InsertRow(pk, row, false)
}

s.stageStore.configMap.AddTableToConfig(s.fullyQualifiedName(tableData), types.NewDwhTableConfig(&cols, nil, false, true))
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true))

s.fakeStageStore.ExecReturnsOnCall(0, nil, fmt.Errorf("390114: Authentication token has expired. The user must authenticate again."))
err := s.stageStore.Merge(tableData)
Expand Down Expand Up @@ -171,8 +170,9 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {

var idx int

fqName := s.fullyQualifiedName(tableData)
s.stageStore.configMap.AddTableToConfig(fqName, types.NewDwhTableConfig(&cols, nil, false, true))
tableID := s.identifierFor(tableData)
fqName := tableID.FullyQualifiedName()
s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true))
err := s.stageStore.Merge(tableData)
assert.Nil(s.T(), err)
s.fakeStageStore.ExecReturns(nil, nil)
Expand Down Expand Up @@ -253,18 +253,18 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {

sflkCols.AddColumn(columns.NewColumn("new", typing.String))
_config := types.NewDwhTableConfig(&sflkCols, nil, false, true)
s.stageStore.configMap.AddTableToConfig(s.fullyQualifiedName(tableData), _config)
s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config)

err := s.stageStore.Merge(tableData)
assert.Nil(s.T(), err)
s.fakeStageStore.ExecReturns(nil, nil)
assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 5, "called merge")

// Check the temp deletion table now.
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(s.fullyQualifiedName(tableData)).ReadOnlyColumnsToDelete()), 1,
s.stageStore.configMap.TableConfig(s.fullyQualifiedName(tableData)).ReadOnlyColumnsToDelete())
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(s.identifierFor(tableData)).ReadOnlyColumnsToDelete()), 1,
s.stageStore.configMap.TableConfig(s.identifierFor(tableData)).ReadOnlyColumnsToDelete())

_, isOk := s.stageStore.configMap.TableConfig(s.fullyQualifiedName(tableData)).ReadOnlyColumnsToDelete()["new"]
_, isOk := s.stageStore.configMap.TableConfig(s.identifierFor(tableData)).ReadOnlyColumnsToDelete()["new"]
assert.True(s.T(), isOk)

// Now try to execute merge where 1 of the rows have the column now
Expand All @@ -285,8 +285,8 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 10, "called merge again")

// Caught up now, so columns should be 0.
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(s.fullyQualifiedName(tableData)).ReadOnlyColumnsToDelete()), 0,
s.stageStore.configMap.TableConfig(s.fullyQualifiedName(tableData)).ReadOnlyColumnsToDelete())
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(s.identifierFor(tableData)).ReadOnlyColumnsToDelete()), 0,
s.stageStore.configMap.TableConfig(s.identifierFor(tableData)).ReadOnlyColumnsToDelete())
}

func (s *SnowflakeTestSuite) TestExecuteMergeExitEarly() {
Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func generateTableData(rows int) (TableIdentifier, *optimization.TableData) {
func (s *SnowflakeTestSuite) TestPrepareTempTable() {
tempTableID, tableData := generateTableData(10)
tempTableName := tempTableID.FullyQualifiedName()
s.stageStore.GetConfigMap().AddTableToConfig(tempTableName, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true))
sflkTc := s.stageStore.GetConfigMap().TableConfig(tempTableName)
s.stageStore.GetConfigMap().AddTableToConfig(tempTableID, types.NewDwhTableConfig(&columns.Columns{}, nil, true, true))
sflkTc := s.stageStore.GetConfigMap().TableConfig(tempTableID)

{
assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(tableData, sflkTc, tempTableID, types.AdditionalSettings{}, true))
Expand Down
36 changes: 18 additions & 18 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Testing 3 scenarios here
// 1. DropDeletedColumns = false, ContainOtherOperations = true, don't delete ever.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqName, types.NewDwhTableConfig(&cols, nil, false, false))
bqTc := d.bigQueryStore.GetConfigMap().TableConfig(bqName)
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, false))
bqTc := d.bigQueryStore.GetConfigMap().TableConfig(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftName, types.NewDwhTableConfig(&cols, nil, false, false))
redshiftTc := d.redshiftStore.GetConfigMap().TableConfig(redshiftName)
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, false))
redshiftTc := d.redshiftStore.GetConfigMap().TableConfig(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeName, types.NewDwhTableConfig(&cols, nil, false, false))
snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfig(snowflakeName)
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, false))
snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfig(snowflakeTableID)
// Prior to deletion, there should be no colsToDelete
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
Expand Down Expand Up @@ -123,14 +123,14 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

// 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqName, types.NewDwhTableConfig(&cols, nil, false, true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfig(bqName)
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfig(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftName, types.NewDwhTableConfig(&cols, nil, false, true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfig(redshiftName)
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfig(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeName, types.NewDwhTableConfig(&cols, nil, false, true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfig(snowflakeName)
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfig(snowflakeTableID)

// Prior to deletion, there should be no colsToDelete
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
Expand Down Expand Up @@ -200,14 +200,14 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

// 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqName, types.NewDwhTableConfig(&cols, nil, false, true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfig(bqName)
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(&cols, nil, false, true))
bqTc = d.bigQueryStore.GetConfigMap().TableConfig(bqTableID)

d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftName, types.NewDwhTableConfig(&cols, nil, false, true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfig(redshiftName)
d.redshiftStore.GetConfigMap().AddTableToConfig(redshiftTableID, types.NewDwhTableConfig(&cols, nil, false, true))
redshiftTc = d.redshiftStore.GetConfigMap().TableConfig(redshiftTableID)

d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeName, types.NewDwhTableConfig(&cols, nil, false, true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfig(snowflakeName)
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(snowflakeTableID, types.NewDwhTableConfig(&cols, nil, false, true))
snowflakeTc = d.snowflakeStagesStore.GetConfigMap().TableConfig(snowflakeTableID)

// Prior to deletion, there should be no colsToDelete
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
Expand Down
Loading

0 comments on commit d431c64

Please sign in to comment.