diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index d2883c57d..1e9944aa1 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -212,6 +212,11 @@ func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includ return destination.ExecStatements(s, dedupeQueries) } +func (s *Store) SweepTemporaryTables() error { + // BigQuery doesn't need to sweep temporary tables, since they support setting TTL on temporary tables. + return nil +} + func LoadBigQuery(cfg config.Config, _store *db.Store) (*Store, error) { if _store != nil { // Used for tests. diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 14456ec71..2e67b84ca 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + dbsql "github.com/databricks/databricks-sql-go" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -143,6 +145,16 @@ WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, )}, nil } +func (d DatabricksDialect) BuildSweepQuery(dbName, schemaName string) (string, []any) { + return fmt.Sprintf(` +SELECT + table_schema, table_name +FROM + %s.information_schema.tables +WHERE + UPPER(table_schema) = UPPER(:p_schema) AND table_name ILIKE :p_artie_prefix`, d.QuoteIdentifier(dbName)), []any{dbsql.Parameter{Name: "p_schema", Value: schemaName}, dbsql.Parameter{Name: "p_artie_prefix", Value: "%" + constants.ArtiePrefix + "%"}} +} + func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { return sql.Native } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 25c388c8e..817b561c0 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -49,6 +49,10 @@ func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql } func (s Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s Store) dialect() dialect.DatabricksDialect { return dialect.DatabricksDialect{} } @@ -184,6 +188,16 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab return fp, writer.Error() } +func (s Store) SweepTemporaryTables() error { + // TODO: We should also remove old volumes + tcs, err := s.cfg.TopicConfigs() + if err != nil { + return err + } + + return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery) +} + func LoadStore(cfg config.Config) (Store, error) { store, err := db.Open("databricks", cfg.Databricks.DSN()) if err != nil { diff --git a/clients/mssql/store.go b/clients/mssql/store.go index f76809e0a..7694d5b33 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -56,7 +56,7 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq return s.specificIdentifierFor(topicConfig, table) } -func (s *Store) Sweep() error { +func (s *Store) SweepTemporaryTables() error { tcs, err := s.config.TopicConfigs() if err != nil { return err diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index cd46578a0..7a09c4311 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -73,7 +73,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab }.GetTableConfig() } -func (s *Store) Sweep() error { +func (s *Store) SweepTemporaryTables() error { tcs, err := s.config.TopicConfigs() if err != nil { return err diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index 5651b19d4..d5c5d5fed 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -40,7 +40,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab }.GetTableConfig() } -func (s *Store) Sweep() error { +func (s *Store) SweepTemporaryTables() error { tcs, err := s.config.TopicConfigs() if err != nil { return err diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index d5adb54f2..9c1430f41 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -17,6 +17,7 @@ type DataWarehouse interface { // SQL specific commands Dialect() sqllib.Dialect Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error + SweepTemporaryTables() error Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) Begin() (*sql.Tx, error) diff --git a/lib/destination/utils/load.go b/lib/destination/utils/load.go index 0c7b403f6..8c5f3bf89 100644 --- a/lib/destination/utils/load.go +++ b/lib/destination/utils/load.go @@ -36,36 +36,15 @@ func LoadBaseline(cfg config.Config) (destination.Baseline, error) { func LoadDataWarehouse(cfg config.Config, store *db.Store) (destination.DataWarehouse, error) { switch cfg.Output { case constants.Snowflake: - s, err := snowflake.LoadSnowflake(cfg, store) - if err != nil { - return nil, err - } - if err = s.Sweep(); err != nil { - return nil, fmt.Errorf("failed to clean up Snowflake: %w", err) - } - return s, nil + return snowflake.LoadSnowflake(cfg, store) case constants.BigQuery: return bigquery.LoadBigQuery(cfg, store) case constants.Databricks: return databricks.LoadStore(cfg) case constants.MSSQL: - s, err := mssql.LoadStore(cfg) - if err != nil { - return nil, err - } - if err = s.Sweep(); err != nil { - return nil, fmt.Errorf("failed to clean up MS SQL: %w", err) - } - return s, nil + return mssql.LoadStore(cfg) case constants.Redshift: - s, err := redshift.LoadRedshift(cfg, store) - if err != nil { - return nil, err - } - if err = s.Sweep(); err != nil { - return nil, fmt.Errorf("failed to clean up Redshift: %w", err) - } - return s, nil + return redshift.LoadRedshift(cfg, store) } return nil, fmt.Errorf("invalid data warehouse output source specified: %q", cfg.Output) diff --git a/main.go b/main.go index 927a6815e..ad4026cd5 100644 --- a/main.go +++ b/main.go @@ -39,8 +39,6 @@ func main() { ) ctx := context.Background() - - // Loading telemetry metricsClient := metrics.LoadExporter(settings.Config) var dest destination.Baseline if utils.IsOutputBaseline(settings.Config) { @@ -49,10 +47,16 @@ func main() { logger.Fatal("Unable to load baseline destination", slog.Any("err", err)) } } else { - dest, err = utils.LoadDataWarehouse(settings.Config, nil) + dwh, err := utils.LoadDataWarehouse(settings.Config, nil) if err != nil { logger.Fatal("Unable to load data warehouse destination", slog.Any("err", err)) } + + if err = dwh.SweepTemporaryTables(); err != nil { + logger.Fatal("Failed to clean up temporary tables", slog.Any("err", err)) + } + + dest = dwh } inMemDB := models.NewMemoryDB() diff --git a/processes/consumer/flush.go b/processes/consumer/flush.go index 61e08cb9f..ca40b7068 100644 --- a/processes/consumer/flush.go +++ b/processes/consumer/flush.go @@ -62,7 +62,7 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B return } - retryCfg, err := retry.NewJitterRetryConfig(500, 30_000, 10, retry.AlwaysRetry) + retryCfg, err := retry.NewJitterRetryConfig(1_000, 30_000, 15, retry.AlwaysRetry) if err != nil { slog.Error("Failed to create retry config", slog.Any("err", err)) return