From de3f88e451dfbe4298fe30071a4e8c5302d85c9d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 1 Dec 2024 17:35:05 -0800 Subject: [PATCH] [DDL] Filter for valid columns on `CreateTable` (#1068) --- clients/shared/merge.go | 2 +- clients/shared/table.go | 32 +++++++++++++++++++++----------- lib/destination/ddl/ddl.go | 3 ++- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 2d4fb20d2..743c7e317 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -54,7 +54,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi // TODO: Examine whether [AuditColumnsToDelete] still needs to be called. tableConfig.AuditColumnsToDelete(srcKeysMissing) if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { - return fmt.Errorf("failed to merge columns from destination: %w", err) + return fmt.Errorf("failed to merge columns from destination: %w for table %q", err, tableData.Name()) } temporaryTableID := TempTableIDWithSuffix(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix()) diff --git a/clients/shared/table.go b/clients/shared/table.go index cc0a0dad1..a2fdae0fa 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -16,11 +16,29 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) +func getValidColumns(cols []columns.Column) []columns.Column { + var validCols []columns.Column + for _, col := range cols { + if col.ShouldSkip() { + continue + } + + validCols = append(validCols, col) + } + + return validCols +} + func CreateTempTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier) error { return CreateTable(ctx, dwh, tableData, tc, settings, tableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()) } func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool, cols []columns.Column) error { + cols = getValidColumns(cols) + if len(cols) == 0 { + return nil + } + query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), cols) if err != nil { return fmt.Errorf("failed to build create table sql: %w", err) @@ -37,20 +55,12 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData * } func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, cols []columns.Column) error { + cols = getValidColumns(cols) if len(cols) == 0 { return nil } - var colsToAdd []columns.Column - for _, col := range cols { - if col.ShouldSkip() { - continue - } - - colsToAdd = append(colsToAdd, col) - } - - sqlParts, err := ddl.BuildAlterTableAddColumns(settings, dwh.Dialect(), tableID, colsToAdd) + sqlParts, err := ddl.BuildAlterTableAddColumns(settings, dwh.Dialect(), tableID, cols) if err != nil { return fmt.Errorf("failed to build alter table add columns: %w", err) } @@ -64,7 +74,7 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc } } - tc.MutateInMemoryColumns(constants.Add, colsToAdd...) + tc.MutateInMemoryColumns(constants.Add, cols...) return nil } diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 3d49a8ad2..d59775c1d 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -26,7 +26,8 @@ func BuildCreateTableSQL(settings config.SharedDestinationColumnSettings, dialec var primaryKeys []string for _, col := range columns { if col.ShouldSkip() { - continue + // It should be filtered upstream + return "", fmt.Errorf("received an invalid column %q", col.Name()) } colName := dialect.QuoteIdentifier(col.Name())