Skip to content

Commit

Permalink
[DDL] More refactor around AlterTableArgs (#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 15, 2024
1 parent e034281 commit 9e9d904
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 247 deletions.
31 changes: 19 additions & 12 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,26 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Keys that exist in CDC stream, but not in DWH
if err = createAlterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
alterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Keys that exist in CDC stream, but not in DWH
if err = alterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

}

if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil {
Expand Down
30 changes: 17 additions & 13 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,31 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
alterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Columns that are missing in DWH, but exist in our CDC stream.
if err = createAlterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
// Columns that are missing in DWH, but exist in our CDC stream.
if err = alterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}
}

// Keys that exist in DWH, but not in our CDC stream.
deleteAlterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
CreateTable: false,
ColumnOp: constants.Delete,
ContainOtherOperations: tableData.ContainOtherOperations(),
CdcTime: tableData.LatestCDCTs,
Expand Down
2 changes: 1 addition & 1 deletion clients/shared/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *
}

// Update cache with the new columns that we've added.
tc.MutateInMemoryColumns(true, constants.Add, tableData.ReadOnlyInMemoryCols().GetColumns()...)
tc.MutateInMemoryColumns(false, constants.Add, tableData.ReadOnlyInMemoryCols().GetColumns()...)
return nil
}
31 changes: 3 additions & 28 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type AlterTableArgs struct {
// ContainsOtherOperations - this is sourced from tableData `containOtherOperations`
ContainOtherOperations bool
TableID sql.TableIdentifier
CreateTable bool
ColumnOp constants.ColumnOperation
Mode config.Mode
CdcTime time.Time
Expand All @@ -86,11 +85,6 @@ func (a AlterTableArgs) Validate() error {
return fmt.Errorf("dialect cannot be nil")
}

// You can't DROP a column and try to create a table at the same time.
if a.ColumnOp == constants.Delete && a.CreateTable {
return fmt.Errorf("incompatible operation - cannot drop columns and create table at the same time")
}

if !(a.Mode == config.History || a.Mode == config.Replication) {
return fmt.Errorf("unexpected mode: %s", a.Mode.String())
}
Expand All @@ -106,7 +100,6 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col
var mutateCol []columns.Column
// It's okay to combine since args.ColumnOp only takes one of: `Delete` or `Add`
var colSQLParts []string
var pkCols []string
for _, col := range cols {
if col.ShouldSkip() {
// Let's not modify the table if the column kind is invalid
Expand All @@ -122,33 +115,15 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col
mutateCol = append(mutateCol, col)
switch a.ColumnOp {
case constants.Add:
colName := a.Dialect.QuoteIdentifier(col.Name())
if shouldCreatePrimaryKey(col, a.Mode, a.CreateTable) {
pkCols = append(pkCols, colName)
}

colSQLParts = append(colSQLParts, fmt.Sprintf("%s %s", colName, a.Dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey())))
colSQLParts = append(colSQLParts, fmt.Sprintf("%s %s", a.Dialect.QuoteIdentifier(col.Name()), a.Dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey())))
case constants.Delete:
colSQLParts = append(colSQLParts, a.Dialect.QuoteIdentifier(col.Name()))
}
}

if len(pkCols) > 0 {
pkStatement := fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(pkCols, ", "))
if _, ok := a.Dialect.(bigQueryDialect.BigQueryDialect); ok {
pkStatement += " NOT ENFORCED"
}

colSQLParts = append(colSQLParts, pkStatement)
}

var alterStatements []string
if a.CreateTable {
alterStatements = []string{a.Dialect.BuildCreateTableQuery(a.TableID, false, colSQLParts)}
} else {
for _, colSQLPart := range colSQLParts {
alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart))
}
for _, colSQLPart := range colSQLParts {
alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart))
}

return alterStatements, mutateCol
Expand Down
12 changes: 0 additions & 12 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: snowflakeTc,
TableID: snowflakeTableID,
CreateTable: snowflakeTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -83,7 +82,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.bigQueryStore.Dialect(),
Tc: bqTc,
TableID: bqTableID,
CreateTable: bqTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -104,7 +102,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.redshiftStore.Dialect(),
Tc: redshiftTc,
TableID: redshiftTableID,
CreateTable: redshiftTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand Down Expand Up @@ -138,7 +135,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: snowflakeTc,
TableID: snowflakeTableID,
CreateTable: snowflakeTc.CreateTable(),
ContainOtherOperations: false,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -158,7 +154,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.bigQueryStore.Dialect(),
Tc: bqTc,
TableID: bqTableID,
CreateTable: bqTc.CreateTable(),
ContainOtherOperations: false,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -178,7 +173,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.redshiftStore.Dialect(),
Tc: redshiftTc,
TableID: redshiftTableID,
CreateTable: redshiftTc.CreateTable(),
ContainOtherOperations: false,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand Down Expand Up @@ -215,7 +209,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: snowflakeTc,
TableID: snowflakeTableID,
CreateTable: snowflakeTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -231,7 +224,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.bigQueryStore.Dialect(),
Tc: bqTc,
TableID: bqTableID,
CreateTable: bqTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -247,7 +239,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.redshiftStore.Dialect(),
Tc: redshiftTc,
TableID: redshiftTableID,
CreateTable: redshiftTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Expand All @@ -271,7 +262,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: snowflakeTc,
TableID: snowflakeTableID,
CreateTable: snowflakeTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts.Add(2 * constants.DeletionConfidencePadding),
Expand All @@ -285,7 +275,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.bigQueryStore.Dialect(),
Tc: bqTc,
TableID: bqTableID,
CreateTable: bqTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts.Add(2 * constants.DeletionConfidencePadding),
Expand All @@ -299,7 +288,6 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
Dialect: d.redshiftStore.Dialect(),
Tc: redshiftTc,
TableID: redshiftTableID,
CreateTable: redshiftTc.CreateTable(),
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts.Add(2 * constants.DeletionConfidencePadding),
Expand Down
54 changes: 24 additions & 30 deletions lib/destination/ddl/ddl_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
CreateTable: tc.CreateTable(),
ColumnOp: constants.Delete,
ContainOtherOperations: true,
CdcTime: ts,
Expand All @@ -75,7 +74,6 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
CreateTable: tc.CreateTable(),
ColumnOp: constants.Delete,
ContainOtherOperations: true,
CdcTime: ts.Add(2 * constants.DeletionConfidencePadding),
Expand Down Expand Up @@ -128,13 +126,12 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
for name, kind := range newCols {
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
CreateTable: tc.CreateTable(),
ColumnOp: constants.Add,
CdcTime: ts,
Mode: config.Replication,
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
ColumnOp: constants.Add,
CdcTime: ts,
Mode: config.Replication,
}

col := columns.NewColumn(name, kind)
Expand Down Expand Up @@ -189,13 +186,12 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
// BQ returning the same error because the column already exists.
d.fakeBigQueryStore.ExecReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]"))
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
CreateTable: tc.CreateTable(),
ColumnOp: constants.Add,
CdcTime: ts,
Mode: config.Replication,
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
ColumnOp: constants.Add,
CdcTime: ts,
Mode: config.Replication,
}

assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column))
Expand Down Expand Up @@ -242,13 +238,12 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
for _, column := range cols.GetColumns() {
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
CreateTable: tc.CreateTable(),
ColumnOp: constants.Delete,
CdcTime: ts,
Mode: config.Replication,
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
ColumnOp: constants.Delete,
CdcTime: ts,
Mode: config.Replication,
}
assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column))
}
Expand All @@ -259,13 +254,12 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {
// Now try to delete again and with an increased TS. It should now be all deleted.
for _, column := range cols.GetColumns() {
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
CreateTable: tc.CreateTable(),
ColumnOp: constants.Delete,
CdcTime: ts.Add(2 * constants.DeletionConfidencePadding),
Mode: config.Replication,
Dialect: d.bigQueryStore.Dialect(),
Tc: tc,
TableID: tableID,
ColumnOp: constants.Delete,
CdcTime: ts.Add(2 * constants.DeletionConfidencePadding),
Mode: config.Replication,
}

assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column))
Expand Down
Loading

0 comments on commit 9e9d904

Please sign in to comment.