From 33970d3ffc7c44228f57ecdc4cb44c6902db6459 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 4 Oct 2024 15:57:29 -0700 Subject: [PATCH] Databricks sweep. --- clients/databricks/dialect/dialect.go | 10 ++++++++++ clients/databricks/store.go | 12 ++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 14456ec71..6b09c6824 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -143,6 +143,16 @@ WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, )}, nil } +func (DatabricksDialect) BuildSweepQuery(dbName, schemaName string) (string, []any) { + return fmt.Sprintf(` +SELECT + table_schema, table_name +FROM + %s.information_schema.tables +WHERE + UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"} +} + func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { return sql.Native } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index e9c35f18b..55506dcd6 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -49,6 +49,10 @@ func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql } func (s Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s Store) dialect() dialect.DatabricksDialect { return dialect.DatabricksDialect{} } @@ -185,8 +189,12 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab } func (s Store) SweepTemporaryTables() error { - // TODO - return nil + tcs, err := s.cfg.TopicConfigs() + if err != nil { + return err + } + + return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery) } func LoadStore(cfg config.Config) (Store, error) {