From c30be97b49f507774b246c9210124dc5712d2fc5 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 4 Oct 2024 15:35:21 -0700 Subject: [PATCH 1/4] Clean up --- clients/bigquery/bigquery.go | 5 +++++ clients/databricks/store.go | 5 +++++ clients/mssql/store.go | 2 +- clients/redshift/redshift.go | 2 +- clients/snowflake/snowflake.go | 2 +- lib/destination/dwh.go | 1 + lib/destination/utils/load.go | 27 +++------------------------ main.go | 10 +++++++--- 8 files changed, 24 insertions(+), 30 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index d2883c57d..1e9944aa1 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -212,6 +212,11 @@ func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includ return destination.ExecStatements(s, dedupeQueries) } +func (s *Store) SweepTemporaryTables() error { + // BigQuery doesn't need to sweep temporary tables, since they support setting TTL on temporary tables. + return nil +} + func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) { if _store != nil { // Used for tests. diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 25c388c8e..e9c35f18b 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -184,6 +184,11 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab return fp, writer.Error() } +func (s Store) SweepTemporaryTables() error { + // TODO + return nil +} + func LoadStore(cfg config.Config) (Store, error) { store, err := db.Open("databricks", cfg.Databricks.DSN()) if err != nil { diff --git a/clients/mssql/store.go b/clients/mssql/store.go index f76809e0a..7694d5b33 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -56,7 +56,7 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq return s.specificIdentifierFor(topicConfig, table) } -func (s *Store) Sweep() error { +func (s *Store) SweepTemporaryTables() error { tcs, err := s.config.TopicConfigs() if err != nil { return err diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index cd46578a0..7a09c4311 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -73,7 +73,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab }.GetTableConfig() } -func (s *Store) Sweep() error { +func (s *Store) SweepTemporaryTables() error { tcs, err := s.config.TopicConfigs() if err != nil { return err diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index 5651b19d4..d5c5d5fed 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -40,7 +40,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab }.GetTableConfig() } -func (s *Store) Sweep() error { +func (s *Store) SweepTemporaryTables() error { tcs, err := s.config.TopicConfigs() if err != nil { return err diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index d5adb54f2..9c1430f41 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -17,6 +17,7 @@ type DataWarehouse interface { // SQL specific commands Dialect() sqllib.Dialect Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error + SweepTemporaryTables() error Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) Begin() (*sql.Tx, error) diff --git a/lib/destination/utils/load.go b/lib/destination/utils/load.go index 0c7b403f6..8c5f3bf89 100644 --- a/lib/destination/utils/load.go +++ b/lib/destination/utils/load.go @@ -36,36 +36,15 @@ func LoadBaseline(cfg config.Config) (destination.Baseline, error) { func LoadDataWarehouse(cfg config.Config, store *db.Store) (destination.DataWarehouse, error) { switch cfg.Output { case constants.Snowflake: - s, err := snowflake.LoadSnowflake(cfg, store) - if err != nil { - return nil, err - } - if err = s.Sweep(); err != nil { - return nil, fmt.Errorf("failed to clean up Snowflake: %w", err) - } - return s, nil + return snowflake.LoadSnowflake(cfg, store) case constants.BigQuery: return bigquery.LoadBigQuery(cfg, store) case constants.Databricks: return databricks.LoadStore(cfg) case constants.MSSQL: - s, err := mssql.LoadStore(cfg) - if err != nil { - return nil, err - } - if err = s.Sweep(); err != nil { - return nil, fmt.Errorf("failed to clean up MS SQL: %w", err) - } - return s, nil + return mssql.LoadStore(cfg) case constants.Redshift: - s, err := redshift.LoadRedshift(cfg, store) - if err != nil { - return nil, err - } - if err = s.Sweep(); err != nil { - return nil, fmt.Errorf("failed to clean up Redshift: %w", err) - } - return s, nil + return redshift.LoadRedshift(cfg, store) } return nil, fmt.Errorf("invalid data warehouse output source specified: %q", cfg.Output) diff --git a/main.go b/main.go index 927a6815e..ad4026cd5 100644 --- a/main.go +++ b/main.go @@ -39,8 +39,6 @@ func main() { ) ctx := context.Background() - - // Loading telemetry metricsClient := metrics.LoadExporter(settings.Config) var dest destination.Baseline if utils.IsOutputBaseline(settings.Config) { @@ -49,10 +47,16 @@ func main() { logger.Fatal("Unable to load baseline destination", slog.Any("err", err)) } } else { - dest, err = utils.LoadDataWarehouse(settings.Config, nil) + dwh, err := utils.LoadDataWarehouse(settings.Config, nil) if err != nil { logger.Fatal("Unable to load data warehouse destination", slog.Any("err", err)) } + + if err = dwh.SweepTemporaryTables(); err != nil { + logger.Fatal("Failed to clean up temporary tables", slog.Any("err", err)) + } + + dest = dwh } inMemDB := models.NewMemoryDB() From 33970d3ffc7c44228f57ecdc4cb44c6902db6459 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 4 Oct 2024 15:57:29 -0700 Subject: [PATCH 2/4] Databricks sweep. --- clients/databricks/dialect/dialect.go | 10 ++++++++++ clients/databricks/store.go | 12 ++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 14456ec71..6b09c6824 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -143,6 +143,16 @@ WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, )}, nil } +func (DatabricksDialect) BuildSweepQuery(dbName, schemaName string) (string, []any) { + return fmt.Sprintf(` +SELECT + table_schema, table_name +FROM + %s.information_schema.tables +WHERE + UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} +} + func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { return sql.Native } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index e9c35f18b..55506dcd6 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -49,6 +49,10 @@ func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql } func (s Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s Store) dialect() dialect.DatabricksDialect { return dialect.DatabricksDialect{} } @@ -185,8 +189,12 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab } func (s Store) SweepTemporaryTables() error { - // TODO - return nil + tcs, err := s.cfg.TopicConfigs() + if err != nil { + return err + } + + return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery) } func LoadStore(cfg config.Config) (Store, error) { From 7434a81a508a9f0764c00fc3e249a315b5711ac1 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 4 Oct 2024 16:13:55 -0700 Subject: [PATCH 3/4] Clean up. --- clients/databricks/dialect/dialect.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 6b09c6824..2e67b84ca 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + dbsql "github.com/databricks/databricks-sql-go" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -143,14 +145,14 @@ WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, )}, nil } -func (DatabricksDialect) BuildSweepQuery(dbName, schemaName string) (string, []any) { +func (d DatabricksDialect) BuildSweepQuery(dbName, schemaName string) (string, []any) { return fmt.Sprintf(` SELECT table_schema, table_name FROM %s.information_schema.tables WHERE - UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} + UPPER(table_schema) = UPPER(:p_schema) AND table_name ILIKE :p_artie_prefix`, d.QuoteIdentifier(dbName)), []any{dbsql.Parameter{Name: "p_schema", Value: schemaName}, dbsql.Parameter{Name: "p_artie_prefix", Value: "%" + constants.ArtiePrefix + "%"}} } func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { From a3b7bdce29650fca7d80e0d60cbcabf256e3f2d5 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 4 Oct 2024 16:14:18 -0700 Subject: [PATCH 4/4] Adding a TODO. --- clients/databricks/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 55506dcd6..817b561c0 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -189,6 +189,7 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab } func (s Store) SweepTemporaryTables() error { + // TODO: We should also remove old volumes tcs, err := s.cfg.TopicConfigs() if err != nil { return err