Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 1, 2024
1 parent 87ef8b6 commit 849f26b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
6 changes: 6 additions & 0 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dialect

import (
"fmt"
"path/filepath"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
Expand Down Expand Up @@ -152,3 +153,8 @@ FROM
WHERE
UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"}
}

func (SnowflakeDialect) BuildRemoveAllFilesFromStage(stageName string, path string) string {
// https://docs.snowflake.com/en/sql-reference/sql/remove
return fmt.Sprintf("REMOVE @%s", filepath.Join(stageName, path))
}
17 changes: 17 additions & 0 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,20 @@ WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "GROUP"=stg."GROUP","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("GROUP","ID","START","UPDATED_AT") VALUES (stg."GROUP",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0])
}

func TestSnowflakeDialect_BuildRemoveAllFilesFromStage(t *testing.T) {
{
// Stage name only, no path
assert.Equal(t,
"REMOVE @STAGE_NAME",
SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", ""),
)
}
{
// Stage name and path
assert.Equal(t,
"REMOVE @STAGE_NAME/path1/subpath2",
SnowflakeDialect{}.BuildRemoveAllFilesFromStage("STAGE_NAME", "path1/subpath2"),
)
}
}
1 change: 1 addition & 0 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
}

if _, err = s.Exec(copyCommand); err != nil {

return fmt.Errorf("failed to run copy into temporary table: %w", err)
}

Expand Down

0 comments on commit 849f26b

Please sign in to comment.