Skip to content

Commit

Permalink
Merge branch 'master' into simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 15, 2024
2 parents 0114a14 + 9e9d904 commit 7bac292
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 314 deletions.
4 changes: 2 additions & 2 deletions clients/databricks/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ type File struct {
}

func NewFile(fileRow map[string]any) (File, error) {
name, err := maputil.GetStringFromMap(fileRow, "name")
name, err := maputil.GetTypeFromMap[string](fileRow, "name")
if err != nil {
return File{}, err
}

fp, err := maputil.GetStringFromMap(fileRow, "path")
fp, err := maputil.GetTypeFromMap[string](fileRow, "path")
if err != nil {
return File{}, err
}
Expand Down
9 changes: 4 additions & 5 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim

// We don't care about srcKeysMissing because we don't drop columns when we append.
_, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols(),
tableConfig.Columns(),
tableData.ReadOnlyInMemoryCols().GetColumns(),
tableConfig.GetColumns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())

if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
createAlterTableArgs := ddl.AlterTableArgs{
alterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
Expand All @@ -49,7 +48,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim
}

// Keys that exist in CDC stream, but not in DWH
if err = createAlterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
if err = alterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

Expand Down
17 changes: 11 additions & 6 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
return fmt.Errorf("failed to get table config: %w", err)
}

srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode())
srcKeysMissing, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols().GetColumns(),
tableConfig.GetColumns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
createAlterTableArgs := ddl.AlterTableArgs{
alterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
Expand All @@ -48,7 +53,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
}

// Columns that are missing in DWH, but exist in our CDC stream.
if err = createAlterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
if err = alterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}
}
Expand All @@ -69,7 +74,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
}

tableConfig.AuditColumnsToDelete(srcKeysMissing)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ func (s *SnowflakeTestSuite) TestGetTableConfig() {
assert.NoError(s.T(), err)

assert.True(s.T(), tableConfig.CreateTable())
assert.Equal(s.T(), len(tableConfig.Columns().GetColumns()), 0)
assert.Len(s.T(), tableConfig.GetColumns(), 0)
assert.False(s.T(), tableConfig.DropDeletedColumns())
}
10 changes: 0 additions & 10 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col
var mutateCol []columns.Column
// It's okay to combine since args.ColumnOp only takes one of: `Delete` or `Add`
var colSQLParts []string
var pkCols []string
for _, col := range cols {
if col.ShouldSkip() {
// Let's not modify the table if the column kind is invalid
Expand All @@ -122,15 +121,6 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col
}
}

if len(pkCols) > 0 {
pkStatement := fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(pkCols, ", "))
if _, ok := a.Dialect.(bigQueryDialect.BigQueryDialect); ok {
pkStatement += " NOT ENFORCED"
}

colSQLParts = append(colSQLParts, pkStatement)
}

var alterStatements []string
for _, colSQLPart := range colSQLParts {
alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart))
Expand Down
37 changes: 18 additions & 19 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength)

// BigQuery
for _, column := range cols.GetColumns() {
Expand All @@ -94,7 +94,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength)

// Redshift
for _, column := range cols.GetColumns() {
Expand All @@ -113,7 +113,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())
assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength)

// 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
Expand Down Expand Up @@ -146,7 +146,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength)

// BigQuery
for _, column := range cols.GetColumns() {
Expand All @@ -165,7 +165,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength)

// Redshift
for _, column := range cols.GetColumns() {
Expand All @@ -184,7 +184,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())
assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength)

// 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
Expand Down Expand Up @@ -249,13 +249,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
}

// Nothing has been deleted, but it is all added to the permissions table.
assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength)
assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength)
assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength)

assert.Equal(d.T(), originalColumnLength, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Len(d.T(), bqTc.ReadOnlyColumnsToDelete(), originalColumnLength)
assert.Len(d.T(), redshiftTc.ReadOnlyColumnsToDelete(), originalColumnLength)
assert.Len(d.T(), snowflakeTc.ReadOnlyColumnsToDelete(), originalColumnLength)

for _, column := range cols.GetColumns() {
alterTableArgs := ddl.AlterTableArgs{
Expand Down Expand Up @@ -298,14 +298,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
}

// Everything has been deleted.
assert.Equal(d.T(), 0, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Equal(d.T(), 0, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Equal(d.T(), 0, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Empty(d.T(), snowflakeTc.GetColumns())
assert.Empty(d.T(), bqTc.GetColumns())
assert.Empty(d.T(), redshiftTc.GetColumns())

assert.Empty(d.T(), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Empty(d.T(), bqTc.ReadOnlyColumnsToDelete())
assert.Empty(d.T(), redshiftTc.ReadOnlyColumnsToDelete())
allColsMap := make(map[string]bool, len(allCols))
for _, allCol := range allCols {
allColsMap[allCol] = true
Expand Down
22 changes: 11 additions & 11 deletions lib/destination/ddl/ddl_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
// Have not deleted, but tried to!
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
// Columns have not been deleted yet.
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength)

// Now try to delete again and with an increased TS. It should now be all deleted.
var callIdx int
Expand All @@ -89,7 +89,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
// Columns have now been deleted.
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
// Columns have not been deleted yet.
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), 0)
assert.Equal(d.T(), originalColumnLength, d.fakeBigQueryStore.ExecCallCount())
}

Expand Down Expand Up @@ -119,8 +119,8 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {

d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete(), 0)
assert.Len(d.T(), existingCols.GetColumns(), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns()))

var callIdx int
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
Expand All @@ -144,9 +144,9 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
}

// Check all the columns, make sure it's correct. (length)
assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns()))
// Check by iterating over the columns
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns() {
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns() {
existingCol, isOk := existingCols.GetColumn(column.Name())
if !isOk {
// Check new cols?
Expand Down Expand Up @@ -177,7 +177,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), existingCols.GetColumns(), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns()))

tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
var callIdx int
Expand All @@ -202,9 +202,9 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
}

// Check all the columns, make sure it's correct. (length)
assert.Equal(d.T(), existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), existingColsLen)
// Check by iterating over the columns
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns() {
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns() {
existingCol, isOk := existingCols.GetColumn(column.Name())
assert.True(d.T(), isOk)
assert.Equal(d.T(), column.KindDetails, existingCol.KindDetails)
Expand Down Expand Up @@ -249,7 +249,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {
}

assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()))
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()))
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength)

// Now try to delete again and with an increased TS. It should now be all deleted.
for _, column := range cols.GetColumns() {
Expand All @@ -268,5 +268,5 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {

// Columns still exist
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()))
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()))
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength)
}
6 changes: 2 additions & 4 deletions lib/destination/ddl/ddl_sflk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {

// Check the table config
tableConfig := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
for _, column := range tableConfig.Columns().GetColumns() {
for _, column := range tableConfig.GetColumns() {
var found bool
for _, expCol := range cols {
if found = column.Name() == expCol.Name(); found {
Expand All @@ -113,9 +113,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {
}
}

assert.True(d.T(), found,
fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v",
column.Name(), tableConfig.Columns(), cols))
assert.True(d.T(), found, fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v", column.Name(), tableConfig.GetColumns(), cols))
}
}

Expand Down
7 changes: 7 additions & 0 deletions lib/destination/types/table_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (d *DwhTableConfig) DropDeletedColumns() bool {
return d.dropDeletedColumns
}

func (d *DwhTableConfig) GetColumns() []columns.Column {
d.RLock()
defer d.RUnlock()

return d.columns.GetColumns()
}

func (d *DwhTableConfig) Columns() *columns.Columns {
if d == nil {
return nil
Expand Down
10 changes: 5 additions & 5 deletions lib/destination/types/table_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) {
go func(tableCfg *types.DwhTableConfig) {
defer wg.Done()
for j := 0; j < 100; j++ {
assert.Equal(t, 3, len(tableCfg.Columns().GetColumns()), tableCfg.Columns().GetColumns())

assert.Len(t, tableCfg.GetColumns(), 3)
kindDetails := typing.Integer
if (j % 2) == 0 {
kindDetails = typing.Array
}

tableCfg.Columns().UpdateColumn(columns.NewColumn("foo", kindDetails))
assert.Equal(t, 3, len(tableCfg.Columns().GetColumns()), tableCfg.Columns().GetColumns())
assert.Len(t, tableCfg.GetColumns(), 3)
}
}(dwhTableCfg)
}
Expand All @@ -85,7 +85,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) {
tc.MutateInMemoryColumns(constants.Add, columns.NewColumn(col, typing.String))
}

assert.Len(t, tc.Columns().GetColumns(), 5)
assert.Len(t, tc.GetColumns(), 5)
var wg sync.WaitGroup
for _, addCol := range []string{"aa", "bb", "cc", "dd", "ee", "ff"} {
wg.Add(1)
Expand All @@ -104,7 +104,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) {
}

wg.Wait()
assert.Len(t, tc.Columns().GetColumns(), 6)
assert.Len(t, tc.GetColumns(), 6)
}

func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions lib/maputil/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ func GetInt32FromMap(obj map[string]any, key string) (int32, error) {
return int32(val), nil
}

func GetStringFromMap(obj map[string]any, key string) (string, error) {
func GetTypeFromMap[T any](obj map[string]any, key string) (T, error) {
value, isOk := obj[key]
if !isOk {
return "", fmt.Errorf("key: %q does not exist in object", key)
var zero T
return zero, fmt.Errorf("key: %q does not exist in object", key)
}

return typing.AssertType[string](value)
return typing.AssertType[T](value)
}
Loading

0 comments on commit 7bac292

Please sign in to comment.