From 7af048beeccb00179bd4e7217daf8f3cbc728cd5 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:11:20 -0700 Subject: [PATCH 1/9] Pass `TableIdentifier` to `addPrefixToTableName` --- clients/snowflake/staging.go | 4 ++-- clients/snowflake/staging_test.go | 2 +- clients/snowflake/util.go | 6 ++++-- clients/snowflake/util_test.go | 16 ++++++++-------- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 9fcbb1717..55aa87c14 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -81,7 +81,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo }() // Upload the CSV file to Snowflake - if _, err = s.Exec(fmt.Sprintf("PUT file://%s @%s AUTO_COMPRESS=TRUE", fp, addPrefixToTableName(tempTableName, "%"))); err != nil { + if _, err = s.Exec(fmt.Sprintf("PUT file://%s @%s AUTO_COMPRESS=TRUE", fp, addPrefixToTableName(tempTableID, "%"))); err != nil { return fmt.Errorf("failed to run PUT for temporary table: %w", err) } @@ -91,7 +91,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo Escape: true, DestKind: s.Label(), }), ","), - escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableName, "%")) + escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%")) if additionalSettings.AdditionalCopyClause != "" { copyCommand += " " + additionalSettings.AdditionalCopyClause diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 300ca7d15..84567929a 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -150,7 +150,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() { `CREATE TABLE IF NOT EXISTS %s (user_id string,first_name string,last_name string,dusty string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`, tempTableName) containsPrefix := strings.HasPrefix(createQuery, prefixQuery) assert.True(s.T(), containsPrefix, fmt.Sprintf("createQuery:%v, prefixQuery:%s", createQuery, prefixQuery)) - resourceName := addPrefixToTableName(tempTableName, "%") + resourceName := addPrefixToTableName(tempTableID, "%") // Second call is a PUT putQuery, _ := s.fakeStageStore.ExecArgsForCall(1) assert.Contains(s.T(), putQuery, "PUT file://", putQuery) diff --git a/clients/snowflake/util.go b/clients/snowflake/util.go index a1b6f0ab0..1dd2976e6 100644 --- a/clients/snowflake/util.go +++ b/clients/snowflake/util.go @@ -4,15 +4,17 @@ import ( "fmt" "strings" + "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing" ) -// addPrefixToTableName will take the fully qualified table name and add a prefix in front of the table +// addPrefixToTableName will take a [types.TableIdentifier] and add a prefix in front of the table // This is necessary for `PUT` commands. The fq name looks like . // Namespace may contain both database and schema. -func addPrefixToTableName(fqTableName string, prefix string) string { +func addPrefixToTableName(tableID types.TableIdentifier, prefix string) string { + fqTableName := tableID.FullyQualifiedName() tableParts := strings.Split(fqTableName, ".") if len(tableParts) == 1 { return prefix + fqTableName diff --git a/clients/snowflake/util_test.go b/clients/snowflake/util_test.go index ba92913e7..e2c66f4ad 100644 --- a/clients/snowflake/util_test.go +++ b/clients/snowflake/util_test.go @@ -14,30 +14,30 @@ func TestAddPrefixToTableName(t *testing.T) { const prefix = "%" type _testCase struct { name string - fqTableName string + fqTableName TableIdentifier expectedFqTableName string } testCases := []_testCase{ { name: "happy path", - fqTableName: "database.schema.tableName", + fqTableName: NewTableIdentifier("database", "schema", "tableName", true), expectedFqTableName: "database.schema.%tableName", }, { name: "tableName only", - fqTableName: "orders", - expectedFqTableName: "%orders", + fqTableName: NewTableIdentifier("", "", "orders", true), + expectedFqTableName: "..%orders", }, { name: "schema and tableName only", - fqTableName: "public.orders", - expectedFqTableName: "public.%orders", + fqTableName: NewTableIdentifier("", "public", "orders", true), + expectedFqTableName: ".public.%orders", }, { name: "db and tableName only", - fqTableName: "db.tableName", - expectedFqTableName: "db.%tableName", + fqTableName: NewTableIdentifier("db", "", "tableName", true), + expectedFqTableName: "db..%tableName", }, } From c551a08dda4672d32e35700ed16a743d7e537038 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:13:14 -0700 Subject: [PATCH 2/9] Simplify --- clients/snowflake/util.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/clients/snowflake/util.go b/clients/snowflake/util.go index 1dd2976e6..e9556cbe6 100644 --- a/clients/snowflake/util.go +++ b/clients/snowflake/util.go @@ -14,14 +14,7 @@ import ( // This is necessary for `PUT` commands. The fq name looks like . // Namespace may contain both database and schema. func addPrefixToTableName(tableID types.TableIdentifier, prefix string) string { - fqTableName := tableID.FullyQualifiedName() - tableParts := strings.Split(fqTableName, ".") - if len(tableParts) == 1 { - return prefix + fqTableName - } - - return fmt.Sprintf("%s.%s%s", - strings.Join(tableParts[0:len(tableParts)-1], "."), prefix, tableParts[len(tableParts)-1]) + return tableID.WithTable(prefix + tableID.Table()).FullyQualifiedName() } // escapeColumns will take columns, filter out invalid, escape and return them in ordered received. From fa9c56a6affaf5f0d2a0c8e66e0c4a6870784929 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:16:05 -0700 Subject: [PATCH 3/9] Inline --- clients/snowflake/staging.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 55aa87c14..a41151eaf 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -48,8 +48,6 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts [] } func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { - tempTableName := tempTableID.FullyQualifiedName() - if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dwh: s, @@ -87,7 +85,8 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo // COPY the CSV file (in Snowflake) into a table copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)", - tempTableName, strings.Join(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(s.ShouldUppercaseEscapedNames(), &sql.NameArgs{ + tempTableID.FullyQualifiedName(), + strings.Join(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(s.ShouldUppercaseEscapedNames(), &sql.NameArgs{ Escape: true, DestKind: s.Label(), }), ","), From 242d08e9dba482eeca53a10bbbce1b83131436b7 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:17:33 -0700 Subject: [PATCH 4/9] Imports --- clients/snowflake/util.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/snowflake/util.go b/clients/snowflake/util.go index e9556cbe6..3d5ef74fd 100644 --- a/clients/snowflake/util.go +++ b/clients/snowflake/util.go @@ -5,9 +5,8 @@ import ( "strings" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" ) // addPrefixToTableName will take a [types.TableIdentifier] and add a prefix in front of the table From f43285781452274249a790de19102e94d9da41e1 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:18:27 -0700 Subject: [PATCH 5/9] Comment --- clients/snowflake/util.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/snowflake/util.go b/clients/snowflake/util.go index 3d5ef74fd..840da632d 100644 --- a/clients/snowflake/util.go +++ b/clients/snowflake/util.go @@ -10,8 +10,7 @@ import ( ) // addPrefixToTableName will take a [types.TableIdentifier] and add a prefix in front of the table -// This is necessary for `PUT` commands. The fq name looks like . -// Namespace may contain both database and schema. +// This is necessary for `PUT` commands. func addPrefixToTableName(tableID types.TableIdentifier, prefix string) string { return tableID.WithTable(prefix + tableID.Table()).FullyQualifiedName() } From bb1452261541f942f9cca1ad3cd1cfa7c6035ff0 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:19:00 -0700 Subject: [PATCH 6/9] Period --- clients/snowflake/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/snowflake/util.go b/clients/snowflake/util.go index 840da632d..ca06ae721 100644 --- a/clients/snowflake/util.go +++ b/clients/snowflake/util.go @@ -9,7 +9,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -// addPrefixToTableName will take a [types.TableIdentifier] and add a prefix in front of the table +// addPrefixToTableName will take a [types.TableIdentifier] and add a prefix in front of the table. // This is necessary for `PUT` commands. func addPrefixToTableName(tableID types.TableIdentifier, prefix string) string { return tableID.WithTable(prefix + tableID.Table()).FullyQualifiedName() From 0013b8423ad76541b2425f315ccdce89b7b68dea Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:19:24 -0700 Subject: [PATCH 7/9] Rename --- clients/snowflake/util_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/snowflake/util_test.go b/clients/snowflake/util_test.go index e2c66f4ad..24b73949f 100644 --- a/clients/snowflake/util_test.go +++ b/clients/snowflake/util_test.go @@ -14,35 +14,35 @@ func TestAddPrefixToTableName(t *testing.T) { const prefix = "%" type _testCase struct { name string - fqTableName TableIdentifier + tableID TableIdentifier expectedFqTableName string } testCases := []_testCase{ { name: "happy path", - fqTableName: NewTableIdentifier("database", "schema", "tableName", true), + tableID: NewTableIdentifier("database", "schema", "tableName", true), expectedFqTableName: "database.schema.%tableName", }, { name: "tableName only", - fqTableName: NewTableIdentifier("", "", "orders", true), + tableID: NewTableIdentifier("", "", "orders", true), expectedFqTableName: "..%orders", }, { name: "schema and tableName only", - fqTableName: NewTableIdentifier("", "public", "orders", true), + tableID: NewTableIdentifier("", "public", "orders", true), expectedFqTableName: ".public.%orders", }, { name: "db and tableName only", - fqTableName: NewTableIdentifier("db", "", "tableName", true), + tableID: NewTableIdentifier("db", "", "tableName", true), expectedFqTableName: "db..%tableName", }, } for _, testCase := range testCases { - assert.Equal(t, addPrefixToTableName(testCase.fqTableName, prefix), testCase.expectedFqTableName, testCase.name) + assert.Equal(t, addPrefixToTableName(testCase.tableID, prefix), testCase.expectedFqTableName, testCase.name) } } From ea432321027acfc9f4d6beb87ce3bb61db05a811 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:20:42 -0700 Subject: [PATCH 8/9] Swap --- clients/snowflake/util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/snowflake/util_test.go b/clients/snowflake/util_test.go index 24b73949f..63f3b4ad2 100644 --- a/clients/snowflake/util_test.go +++ b/clients/snowflake/util_test.go @@ -42,7 +42,7 @@ func TestAddPrefixToTableName(t *testing.T) { } for _, testCase := range testCases { - assert.Equal(t, addPrefixToTableName(testCase.tableID, prefix), testCase.expectedFqTableName, testCase.name) + assert.Equal(t, testCase.expectedFqTableName, addPrefixToTableName(testCase.tableID, prefix), testCase.name) } } From 0e5d1a9da3c822d09e8e49309fc49739447c71ae Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Sun, 21 Apr 2024 17:32:14 -0700 Subject: [PATCH 9/9] Add TODOs --- clients/shared/sweep.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/shared/sweep.go b/clients/shared/sweep.go index 3bd17b27f..155446f60 100644 --- a/clients/shared/sweep.go +++ b/clients/shared/sweep.go @@ -13,6 +13,7 @@ type GetQueryFunc func(dbAndSchemaPair kafkalib.DatabaseSchemaPair) (string, []a func Sweep(dwh destination.DataWarehouse, topicConfigs []*kafkalib.TopicConfig, getQueryFunc GetQueryFunc) error { slog.Info("Looking to see if there are any dangling artie temporary tables to delete...") + // TODO: Rewrite this to use [DataWarehouse.IdentifierFor] dbAndSchemaPairs := kafkalib.GetUniqueDatabaseAndSchema(topicConfigs) for _, dbAndSchemaPair := range dbAndSchemaPairs { query, args := getQueryFunc(dbAndSchemaPair) @@ -29,6 +30,7 @@ func Sweep(dwh destination.DataWarehouse, topicConfigs []*kafkalib.TopicConfig, } if ddl.ShouldDeleteFromName(tableName) { + // TODO: Rewrite this to pass a [types.TableIdentifiers] to [DropTemporaryTable] err = ddl.DropTemporaryTable(dwh, fmt.Sprintf("%s.%s.%s", dbAndSchemaPair.Database, tableSchema, tableName), true) if err != nil { return err