Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Apr 21, 2024
1 parent 156292f commit c456f16
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error {
tempTableName := tempTableID.FullyQualifiedName()

// Redshift always creates a temporary table.
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Expand All @@ -35,7 +33,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
return fmt.Errorf("failed to create temp table: %w", err)
}

fp, err := s.loadTemporaryTable(tableData, tempTableName)
fp, err := s.loadTemporaryTable(tableData, tempTableID)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
}
Expand All @@ -61,16 +59,20 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
// COPY table_name FROM '/path/to/local/file' DELIMITER '\t' NULL '\\N' FORMAT csv;
// Note, we need to specify `\\N` here and in `CastColVal(..)` we are only doing `\N`, this is because Redshift treats backslashes as an escape character.
// So, it'll convert `\N` => `\\N` during COPY.
copyStmt := fmt.Sprintf(`COPY %s FROM '%s' DELIMITER '\t' NULL AS '\\N' GZIP FORMAT CSV %s dateformat 'auto' timeformat 'auto';`, tempTableName, s3Uri, s.credentialsClause)
copyStmt := fmt.Sprintf(
`COPY %s FROM '%s' DELIMITER '\t' NULL AS '\\N' GZIP FORMAT CSV %s dateformat 'auto' timeformat 'auto';`,
tempTableID.FullyQualifiedName(),
s3Uri, s.credentialsClause,
)
if _, err = s.Exec(copyStmt); err != nil {
return fmt.Errorf("failed to run COPY for temporary table: %w", err)
}

return nil
}

func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableName string) (string, error) {
filePath := fmt.Sprintf("/tmp/%s.csv.gz", newTableName)
func (s *Store) loadTemporaryTable(tableData *optimization.TableData, tableID types.TableIdentifier) (string, error) {
filePath := fmt.Sprintf("/tmp/%s.csv.gz", tableID.FullyQualifiedName())
file, err := os.Create(filePath)
if err != nil {
return "", err
Expand Down

0 comments on commit c456f16

Please sign in to comment.