Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed May 2, 2024
1 parent 4153191 commit e527a65
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 30 deletions.
28 changes: 15 additions & 13 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,27 @@ import (

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/ddl"

"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/s3lib"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error {
// Redshift always creates a temporary table.
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}
}

fp, err := s.loadTemporaryTable(tableData, tempTableID)
Expand Down
15 changes: 1 addition & 14 deletions clients/redshift/writes.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package redshift

import (
"fmt"

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
)

func (s *Store) Append(tableData *optimization.TableData) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())

// Redshift is slightly different, we'll load and create the temporary table via shared.Append
// Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs.
temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix())
if err := shared.Append(s, tableData, types.AppendOpts{TempTableID: temporaryTableID}); err != nil {
return err
}

_, err := s.Exec(
fmt.Sprintf(`ALTER TABLE %s APPEND FROM %s;`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()),
)
return err
return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID})
}

func (s *Store) Merge(tableData *optimization.TableData) error {
Expand Down
11 changes: 8 additions & 3 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
}

// We don't care about srcKeysMissing because we don't drop columns when we append.
_, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode())
_, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols(),
tableConfig.Columns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
tableData.Mode(),
)

createAlterTableArgs := ddl.AlterTableArgs{
Dwh: dwh,
Expand Down
1 change: 1 addition & 0 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
// It has a safety check to make sure the tableName contains the `constants.ArtiePrefix` key.
// Temporary tables look like this: database.schema.tableName__artie__RANDOM_STRING(5)_expiryUnixTs
func DropTemporaryTable(dwh destination.DataWarehouse, fqTableName string, shouldReturnError bool) error {
fmt.Println("dropping", fqTableName)
if strings.Contains(strings.ToLower(fqTableName), constants.ArtiePrefix) {
sqlCommand := fmt.Sprintf("DROP TABLE IF EXISTS %s", fqTableName)
slog.Debug("Dropping temporary table", slog.String("sql", sqlCommand))
Expand Down

0 comments on commit e527a65

Please sign in to comment.