From 85130a7bd511164626f7b7a3be7c59d2f6fa3798 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 26 Nov 2024 23:10:34 -0800 Subject: [PATCH] Clean up. --- clients/shared/merge.go | 13 ------------- clients/shared/table.go | 41 +++++++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index ed25f10a1..743c7e317 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -36,12 +36,6 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi tableData.Mode(), ) - fmt.Println("targetKeysMissing", targetKeysMissing, "srcKeysMissing", srcKeysMissing) - - for _, col := range tableConfig.GetColumns() { - fmt.Println("col", col.Name(), "colKind", col.KindDetails, "tableName", tableData.Name()) - } - tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) if tableConfig.CreateTable() { if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, targetKeysMissing); err != nil { @@ -60,13 +54,6 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi // TODO: Examine whether [AuditColumnsToDelete] still needs to be called. tableConfig.AuditColumnsToDelete(srcKeysMissing) if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { - slog.Error("failed to merge columns from destination", - slog.Any("err", err), - slog.String("tableName", tableID.FullyQualifiedName()), - slog.Any("srcKeysMissing", srcKeysMissing), - slog.Any("targetKeysMissing", targetKeysMissing), - ) - return fmt.Errorf("failed to merge columns from destination: %w for table %q", err, tableData.Name()) } diff --git a/clients/shared/table.go b/clients/shared/table.go index 4737c4b87..a2fdae0fa 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -16,21 +16,30 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func CreateTempTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier) error { - return CreateTable(ctx, dwh, tableData, tc, settings, tableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()) -} - -func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool, cols []columns.Column) error { - var colsToAdd []columns.Column +func getValidColumns(cols []columns.Column) []columns.Column { + var validCols []columns.Column for _, col := range cols { if col.ShouldSkip() { continue } - colsToAdd = append(colsToAdd, col) + validCols = append(validCols, col) } - query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), colsToAdd) + return validCols +} + +func CreateTempTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier) error { + return CreateTable(ctx, dwh, tableData, tc, settings, tableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()) +} + +func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool, cols []columns.Column) error { + cols = getValidColumns(cols) + if len(cols) == 0 { + return nil + } + + query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), cols) if err != nil { return fmt.Errorf("failed to build create table sql: %w", err) } @@ -41,25 +50,17 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData * } // Update cache with the new columns that we've added. - tc.MutateInMemoryColumns(constants.Add, colsToAdd...) + tc.MutateInMemoryColumns(constants.Add, cols...) return nil } func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, cols []columns.Column) error { + cols = getValidColumns(cols) if len(cols) == 0 { return nil } - var colsToAdd []columns.Column - for _, col := range cols { - if col.ShouldSkip() { - continue - } - - colsToAdd = append(colsToAdd, col) - } - - sqlParts, err := ddl.BuildAlterTableAddColumns(settings, dwh.Dialect(), tableID, colsToAdd) + sqlParts, err := ddl.BuildAlterTableAddColumns(settings, dwh.Dialect(), tableID, cols) if err != nil { return fmt.Errorf("failed to build alter table add columns: %w", err) } @@ -73,7 +74,7 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc } } - tc.MutateInMemoryColumns(constants.Add, colsToAdd...) + tc.MutateInMemoryColumns(constants.Add, cols...) return nil }