Skip to content

Commit

Permalink
[Snowflake] Disable time travel for staging tables (#1061)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 21, 2024
1 parent 1671942 commit 5c148f5
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 4 deletions.
3 changes: 2 additions & 1 deletion clients/snowflake/dialect/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ func (SnowflakeDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, tempo

if temporary {
// TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table
// DATA_RETENTION_TIME_IN_DAYS = 0 - This will disable time travel on staging tables and reduce storage overhead.
// PURGE syntax - https://docs.snowflake.com/en/sql-reference/sql/copy-into-table#purging-files-after-loading
// FIELD_OPTIONALLY_ENCLOSED_BY - is needed because CSV will try to escape any values that have `"`
return query + ` STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`
return query + ` DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`
} else {
return query
}
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestSnowflakeDialect_BuildCreateTableQuery(t *testing.T) {

// Temporary:
assert.Equal(t,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`,
`CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`,
SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}),
)
// Not temporary:
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() {
_, createQuery, _ := s.fakeStageStore.ExecContextArgsForCall(0)

prefixQuery := fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s ("USER_ID" string,"FIRST_NAME" string,"LAST_NAME" string,"DUSTY" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, tempTableName)
`CREATE TABLE IF NOT EXISTS %s ("USER_ID" string,"FIRST_NAME" string,"LAST_NAME" string,"DUSTY" string) DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, tempTableName)
containsPrefix := strings.HasPrefix(createQuery, prefixQuery)
assert.True(s.T(), containsPrefix, fmt.Sprintf("createQuery:%v, prefixQuery:%s", createQuery, prefixQuery))
resourceName := addPrefixToTableName(tempTableID, "%")
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/ddl/ddl_temp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() {
tableID := dialect.NewTableIdentifier("db", "schema", "tempTableName")
query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)})
assert.NoError(d.T(), err)
assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`)
assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`)
}
{
// BigQuery
Expand Down

0 comments on commit 5c148f5

Please sign in to comment.