diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index e454723a4..c258a4fd2 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -46,22 +46,21 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts [] return replaceExceededValues(value, colKind), nil } -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { - if createTempTable { - tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()), - Mode: tableData.Mode(), - } +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, _ bool) error { + // Snowflake always creates a temporary table + tempAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()), + Mode: tableData.Mode(), + } - if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { - return fmt.Errorf("failed to create temp table: %w", err) - } + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + return fmt.Errorf("failed to create temp table: %w", err) } // Write data into CSV diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index 90bdda710..e1c4aa369 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -10,6 +10,8 @@ import ( ) func (s *Store) Append(tableData *optimization.TableData) error { + tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) + var err error for i := 0; i < maxRetries; i++ { if i > 0 { @@ -24,12 +26,15 @@ func (s *Store) Append(tableData *optimization.TableData) error { } } - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. + temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix()) err = shared.Append(s, tableData, types.AppendOpts{ - TempTableID: tableID, + TempTableID: temporaryTableID, AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, }) + + if err == nil { + _, err = s.Exec(`INSERT INTO %s SELECT * FROM %s`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()) + } } return err