From e527a655bc74f6bfed8f177581b65c30e5ce132b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:15:56 -0700 Subject: [PATCH] Clean up. --- clients/redshift/staging.go | 28 +++++++++++++++------------- clients/redshift/writes.go | 15 +-------------- clients/shared/append.go | 11 ++++++++--- lib/destination/ddl/ddl.go | 1 + 4 files changed, 25 insertions(+), 30 deletions(-) diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 5c562034c..a38fa0864 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -10,25 +10,27 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" + "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/s3lib" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error { - // Redshift always creates a temporary table. - tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - Mode: tableData.Mode(), - } +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { + if createTempTable { + tempAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), + } - if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { - return fmt.Errorf("failed to create temp table: %w", err) + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + return fmt.Errorf("failed to create temp table: %w", err) + } } fp, err := s.loadTemporaryTable(tableData, tempTableID) diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go index fcaf08e67..edd130b95 100644 --- a/clients/redshift/writes.go +++ b/clients/redshift/writes.go @@ -1,8 +1,6 @@ package redshift import ( - "fmt" - "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" @@ -10,18 +8,7 @@ import ( func (s *Store) Append(tableData *optimization.TableData) error { tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - - // Redshift is slightly different, we'll load and create the temporary table via shared.Append - // Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs. - temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix()) - if err := shared.Append(s, tableData, types.AppendOpts{TempTableID: temporaryTableID}); err != nil { - return err - } - - _, err := s.Exec( - fmt.Sprintf(`ALTER TABLE %s APPEND FROM %s;`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()), - ) - return err + return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) } func (s *Store) Merge(tableData *optimization.TableData) error { diff --git a/clients/shared/append.go b/clients/shared/append.go index 217758183..436e81976 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -23,9 +23,14 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op } // We don't care about srcKeysMissing because we don't drop columns when we append. - _, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), - tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, - tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode()) + _, targetKeysMissing := columns.Diff( + tableData.ReadOnlyInMemoryCols(), + tableConfig.Columns(), + tableData.TopicConfig().SoftDelete, + tableData.TopicConfig().IncludeArtieUpdatedAt, + tableData.TopicConfig().IncludeDatabaseUpdatedAt, + tableData.Mode(), + ) createAlterTableArgs := ddl.AlterTableArgs{ Dwh: dwh, diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 72c193811..609aa2787 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -20,6 +20,7 @@ import ( // It has a safety check to make sure the tableName contains the `constants.ArtiePrefix` key. // Temporary tables look like this: database.schema.tableName__artie__RANDOM_STRING(5)_expiryUnixTs func DropTemporaryTable(dwh destination.DataWarehouse, fqTableName string, shouldReturnError bool) error { + fmt.Println("dropping", fqTableName) if strings.Contains(strings.ToLower(fqTableName), constants.ArtiePrefix) { sqlCommand := fmt.Sprintf("DROP TABLE IF EXISTS %s", fqTableName) slog.Debug("Dropping temporary table", slog.String("sql", sqlCommand))