From 849f26b28128155822d05ad4d24f4bd1c2b19fad Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 30 Nov 2024 16:30:05 -0800 Subject: [PATCH 1/4] WIP. --- clients/snowflake/dialect/dialect.go | 6 ++++++ clients/snowflake/dialect/dialect_test.go | 17 +++++++++++++++++ clients/snowflake/staging.go | 1 + 3 files changed, 24 insertions(+) diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 09b341089..54af03e3a 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -2,6 +2,7 @@ package dialect import ( "fmt" + "path/filepath" "strings" "github.com/artie-labs/transfer/lib/config/constants" @@ -152,3 +153,8 @@ FROM WHERE UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} } + +func (SnowflakeDialect) BuildRemoveAllFilesFromStage(stageName string, path string) string { + // https://docs.snowflake.com/en/sql-reference/sql/remove + return fmt.Sprintf("REMOVE @%s", filepath.Join(stageName, path)) +} diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index f06e07e85..30a1784f2 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -225,3 +225,20 @@ WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "GROUP"=stg."GROUP","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT" WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("GROUP","ID","START","UPDATED_AT") VALUES (stg."GROUP",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0]) } + +func TestSnowflakeDialect_BuildRemoveAllFilesFromStage(t *testing.T) { + { + // Stage name only, no path + assert.Equal(t, + "REMOVE @STAGE_NAME", + SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", ""), + ) + } + { + // Stage name and path + assert.Equal(t, + "REMOVE @STAGE_NAME/path1/subpath2", + SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", "path1/subpath2"), + ) + } +} diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index c1ee740ab..9d3a93824 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -86,6 +86,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if _, err = s.Exec(copyCommand); err != nil { + return fmt.Errorf("failed to run copy into temporary table: %w", err) } From 39e81024ecfb18577ba1a67b2d1f2d33ceed8d25 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 30 Nov 2024 16:36:24 -0800 Subject: [PATCH 2/4] Clean up. --- clients/snowflake/staging.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 9d3a93824..b09851d6c 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -86,6 +86,13 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if _, err = s.Exec(copyCommand); err != nil { + // If COPY INTO failed, we should delete the file from the stage + // We only need to do this for non-temp tables (which would be the case for History mode) because we PURGE = TRUE will only delete the file after a successful COPY INTO + if !createTempTable { + if _, deleteErr := s.ExecContext(ctx, s.dialect().BuildRemoveAllFilesFromStage(tempTableID.FullyQualifiedName(), "")); deleteErr != nil { + slog.Warn("Failed to remove all files from stage", slog.Any("deleteErr", deleteErr)) + } + } return fmt.Errorf("failed to run copy into temporary table: %w", err) } From 55c839e5074638da130ba9363f16b9d3521af790 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 30 Nov 2024 16:38:51 -0800 Subject: [PATCH 3/4] Update. --- clients/snowflake/staging.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index b09851d6c..67b440a21 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -86,8 +86,9 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if _, err = s.Exec(copyCommand); err != nil { - // If COPY INTO failed, we should delete the file from the stage - // We only need to do this for non-temp tables (which would be the case for History mode) because we PURGE = TRUE will only delete the file after a successful COPY INTO + // For non-temp tables, we should try to delete the staging file if COPY INTO fails. + // This is because [PURGE = TRUE] will only delete the staging files upon a successful COPY INTO. + // We also only need to do this for non-temp tables because these staging files will linger, since we create a new temporary table per attempt. if !createTempTable { if _, deleteErr := s.ExecContext(ctx, s.dialect().BuildRemoveAllFilesFromStage(tempTableID.FullyQualifiedName(), "")); deleteErr != nil { slog.Warn("Failed to remove all files from stage", slog.Any("deleteErr", deleteErr)) From 7141cb1502eb8939934096517236e4cdefb3afd3 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 30 Nov 2024 16:39:21 -0800 Subject: [PATCH 4/4] Function rename. --- clients/snowflake/dialect/dialect.go | 2 +- clients/snowflake/dialect/dialect_test.go | 4 ++-- clients/snowflake/staging.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 54af03e3a..811bb7d57 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -154,7 +154,7 @@ WHERE UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} } -func (SnowflakeDialect) BuildRemoveAllFilesFromStage(stageName string, path string) string { +func (SnowflakeDialect) BuildRemoveFilesFromStage(stageName string, path string) string { // https://docs.snowflake.com/en/sql-reference/sql/remove return fmt.Sprintf("REMOVE @%s", filepath.Join(stageName, path)) } diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 30a1784f2..6f808433a 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -231,14 +231,14 @@ func TestSnowflakeDialect_BuildRemoveAllFilesFromStage(t *testing.T) { // Stage name only, no path assert.Equal(t, "REMOVE @STAGE_NAME", - SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", ""), + SnowflakeDialect{}.BuildRemoveFilesFromStage("STAGE_NAME", ""), ) } { // Stage name and path assert.Equal(t, "REMOVE @STAGE_NAME/path1/subpath2", - SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", "path1/subpath2"), + SnowflakeDialect{}.BuildRemoveFilesFromStage("STAGE_NAME", "path1/subpath2"), ) } } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 67b440a21..09c0aae20 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -90,7 +90,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati // This is because [PURGE = TRUE] will only delete the staging files upon a successful COPY INTO. // We also only need to do this for non-temp tables because these staging files will linger, since we create a new temporary table per attempt. if !createTempTable { - if _, deleteErr := s.ExecContext(ctx, s.dialect().BuildRemoveAllFilesFromStage(tempTableID.FullyQualifiedName(), "")); deleteErr != nil { + if _, deleteErr := s.ExecContext(ctx, s.dialect().BuildRemoveFilesFromStage(tempTableID.FullyQualifiedName(), "")); deleteErr != nil { slog.Warn("Failed to remove all files from stage", slog.Any("deleteErr", deleteErr)) } }