From 0a635d1d5e346dbac3dde964389ec315d41cd46b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 1 Dec 2024 17:39:58 -0800 Subject: [PATCH] [Snowflake] Clean up staging files after a failed `COPY INTO` (#1069) --- clients/snowflake/dialect/dialect.go | 6 ++++++ clients/snowflake/dialect/dialect_test.go | 17 +++++++++++++++++ clients/snowflake/staging.go | 9 +++++++++ 3 files changed, 32 insertions(+) diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 09b341089..811bb7d57 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -2,6 +2,7 @@ package dialect import ( "fmt" + "path/filepath" "strings" "github.com/artie-labs/transfer/lib/config/constants" @@ -152,3 +153,8 @@ FROM WHERE UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} } + +func (SnowflakeDialect) BuildRemoveFilesFromStage(stageName string, path string) string { + // https://docs.snowflake.com/en/sql-reference/sql/remove + return fmt.Sprintf("REMOVE @%s", filepath.Join(stageName, path)) +} diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index f06e07e85..6f808433a 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -225,3 +225,20 @@ WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "GROUP"=stg."GROUP","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT" WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("GROUP","ID","START","UPDATED_AT") VALUES (stg."GROUP",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0]) } + +func TestSnowflakeDialect_BuildRemoveAllFilesFromStage(t *testing.T) { + { + // Stage name only, no path + assert.Equal(t, + "REMOVE @STAGE_NAME", + SnowflakeDialect{}.BuildRemoveFilesFromStage("STAGE_NAME", ""), + ) + } + { + // Stage name and path + assert.Equal(t, + "REMOVE @STAGE_NAME/path1/subpath2", + SnowflakeDialect{}.BuildRemoveFilesFromStage("STAGE_NAME", "path1/subpath2"), + ) + } +} diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index c1ee740ab..09c0aae20 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -86,6 +86,15 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if _, err = s.Exec(copyCommand); err != nil { + // For non-temp tables, we should try to delete the staging file if COPY INTO fails. + // This is because [PURGE = TRUE] will only delete the staging files upon a successful COPY INTO. + // We also only need to do this for non-temp tables because these staging files will linger, since we create a new temporary table per attempt. + if !createTempTable { + if _, deleteErr := s.ExecContext(ctx, s.dialect().BuildRemoveFilesFromStage(tempTableID.FullyQualifiedName(), "")); deleteErr != nil { + slog.Warn("Failed to remove all files from stage", slog.Any("deleteErr", deleteErr)) + } + } + return fmt.Errorf("failed to run copy into temporary table: %w", err) }