diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 14456ec71..2e67b84ca 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + dbsql "github.com/databricks/databricks-sql-go" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -143,6 +145,16 @@ WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, )}, nil } +func (d DatabricksDialect) BuildSweepQuery(dbName, schemaName string) (string, []any) { + return fmt.Sprintf(` +SELECT + table_schema, table_name +FROM + %s.information_schema.tables +WHERE + UPPER(table_schema) = UPPER(:p_schema) AND table_name ILIKE :p_artie_prefix`, d.QuoteIdentifier(dbName)), []any{dbsql.Parameter{Name: "p_schema", Value: schemaName}, dbsql.Parameter{Name: "p_artie_prefix", Value: "%" + constants.ArtiePrefix + "%"}} +} + func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { return sql.Native } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index e9c35f18b..817b561c0 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -49,6 +49,10 @@ func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql } func (s Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s Store) dialect() dialect.DatabricksDialect { return dialect.DatabricksDialect{} } @@ -185,8 +189,13 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab } func (s Store) SweepTemporaryTables() error { - // TODO - return nil + // TODO: We should also remove old volumes + tcs, err := s.cfg.TopicConfigs() + if err != nil { + return err + } + + return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery) } func LoadStore(cfg config.Config) (Store, error) {