From 5c148f560b857903b1c77dd6c6ee819ef6060c1b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 21 Nov 2024 09:23:42 -0800 Subject: [PATCH] [Snowflake] Disable time travel for staging tables (#1061) --- clients/snowflake/dialect/ddl.go | 3 ++- clients/snowflake/dialect/dialect_test.go | 2 +- clients/snowflake/staging_test.go | 2 +- lib/destination/ddl/ddl_temp_test.go | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/snowflake/dialect/ddl.go b/clients/snowflake/dialect/ddl.go index dd864960b..ff0b5b699 100644 --- a/clients/snowflake/dialect/ddl.go +++ b/clients/snowflake/dialect/ddl.go @@ -12,9 +12,10 @@ func (SnowflakeDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, tempo if temporary { // TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table + // DATA_RETENTION_TIME_IN_DAYS = 0 - This will disable time travel on staging tables and reduce storage overhead. // PURGE syntax - https://docs.snowflake.com/en/sql-reference/sql/copy-into-table#purging-files-after-loading // FIELD_OPTIONALLY_ENCLOSED_BY - is needed because CSV will try to escape any values that have `"` - return query + ` STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)` + return query + ` DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)` } else { return query } diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index c44732e33..f06e07e85 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -36,7 +36,7 @@ func TestSnowflakeDialect_BuildCreateTableQuery(t *testing.T) { // Temporary: assert.Equal(t, - `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, + `CREATE TABLE IF NOT EXISTS {TABLE} ({PART_1},{PART_2}) DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, SnowflakeDialect{}.BuildCreateTableQuery(fakeTableID, true, []string{"{PART_1}", "{PART_2}"}), ) // Not temporary: diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 0e93061b6..98e4933b8 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -166,7 +166,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() { _, createQuery, _ := s.fakeStageStore.ExecContextArgsForCall(0) prefixQuery := fmt.Sprintf( - `CREATE TABLE IF NOT EXISTS %s ("USER_ID" string,"FIRST_NAME" string,"LAST_NAME" string,"DUSTY" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, tempTableName) + `CREATE TABLE IF NOT EXISTS %s ("USER_ID" string,"FIRST_NAME" string,"LAST_NAME" string,"DUSTY" string) DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, tempTableName) containsPrefix := strings.HasPrefix(createQuery, prefixQuery) assert.True(s.T(), containsPrefix, fmt.Sprintf("createQuery:%v, prefixQuery:%s", createQuery, prefixQuery)) resourceName := addPrefixToTableName(tempTableID, "%") diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index bd2e2f2bf..b9a615c6d 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -23,7 +23,7 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { tableID := dialect.NewTableIdentifier("db", "schema", "tempTableName") query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)}) assert.NoError(d.T(), err) - assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`) + assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) DATA_RETENTION_TIME_IN_DAYS = 0 STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`) } { // BigQuery