Skip to content

Commit

Permalink
Fix Snowflake bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 25, 2024
1 parent f16981d commit 6b1e49a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
29 changes: 14 additions & 15 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,21 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts []
return replaceExceededValues(value, colKind), nil
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
}
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, _ bool) error {
// Snowflake always creates a temporary table
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}
if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}

// Write data into CSV
Expand Down
11 changes: 8 additions & 3 deletions clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
)

func (s *Store) Append(tableData *optimization.TableData) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())

var err error
for i := 0; i < maxRetries; i++ {
if i > 0 {
Expand All @@ -24,12 +26,15 @@ func (s *Store) Append(tableData *optimization.TableData) error {
}
}

tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
// TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing.
temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix())
err = shared.Append(s, tableData, types.AppendOpts{
TempTableID: tableID,
TempTableID: temporaryTableID,
AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`,
})

if err == nil {
_, err = s.Exec(`INSERT INTO %s SELECT * FROM %s`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName())
}
}

return err
Expand Down

0 comments on commit 6b1e49a

Please sign in to comment.