From 849f26b28128155822d05ad4d24f4bd1c2b19fad Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 30 Nov 2024 16:30:05 -0800 Subject: [PATCH] WIP. --- clients/snowflake/dialect/dialect.go | 6 ++++++ clients/snowflake/dialect/dialect_test.go | 17 +++++++++++++++++ clients/snowflake/staging.go | 1 + 3 files changed, 24 insertions(+) diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 09b341089..54af03e3a 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -2,6 +2,7 @@ package dialect import ( "fmt" + "path/filepath" "strings" "github.com/artie-labs/transfer/lib/config/constants" @@ -152,3 +153,8 @@ FROM WHERE UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} } + +func (SnowflakeDialect) BuildRemoveAllFilesFromStage(stageName string, path string) string { + // https://docs.snowflake.com/en/sql-reference/sql/remove + return fmt.Sprintf("REMOVE @%s", filepath.Join(stageName, path)) +} diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index f06e07e85..30a1784f2 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -225,3 +225,20 @@ WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "GROUP"=stg."GROUP","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT" WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("GROUP","ID","START","UPDATED_AT") VALUES (stg."GROUP",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0]) } + +func TestSnowflakeDialect_BuildRemoveAllFilesFromStage(t *testing.T) { + { + // Stage name only, no path + assert.Equal(t, + "REMOVE @STAGE_NAME", + SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", ""), + ) + } + { + // Stage name and path + assert.Equal(t, + "REMOVE @STAGE_NAME/path1/subpath2", + SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", "path1/subpath2"), + ) + } +} diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index c1ee740ab..9d3a93824 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -86,6 +86,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if _, err = s.Exec(copyCommand); err != nil { + return fmt.Errorf("failed to run copy into temporary table: %w", err) }