From a584128584e847d9b3e09a3052527bdd9e30afc8 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 30 Sep 2024 10:12:23 -0700 Subject: [PATCH] WIP. --- clients/databricks/dialect/dialect.go | 5 ++-- clients/databricks/store.go | 39 +++++++++++++++------------ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index c0ca9b581..57021cbbb 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -76,9 +76,8 @@ func (d DatabricksDialect) IsColumnAlreadyExistsErr(_ error) bool { return false } -func (d DatabricksDialect) IsTableDoesNotExistErr(err error) bool { - // Implement the logic to check if the error is a "table does not exist" error - return strings.Contains(err.Error(), "does not exist") +func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool { + return strings.Contains(err.Error(), "[TABLE_OR_VIEW_NOT_FOUND]") } func (d DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string { diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 342c71557..b453c8501 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -17,7 +17,12 @@ import ( type Store struct { db.Store - cfg config.Config + cfg config.Config + configMap *types.DwhToTablesConfigMap +} + +func describeTableQuery(tableID TableIdentifier) (string, []any) { + return fmt.Sprintf("DESCRIBE TABLE %s.%s.%s", tableID.Database(), tableID.Schema(), tableID.Table()), nil } func (s Store) Merge(tableData *optimization.TableData) error { @@ -41,20 +46,19 @@ func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, include } func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) { - panic("not implemented") - //tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - //query, args := describeTableQuery(tableID) - //return shared.GetTableCfgArgs{ - // Dwh: s, - // TableID: tableID, - // ConfigMap: s.configMap, - // Query: query, - // Args: args, - // ColumnNameForName: "column_name", - // ColumnNameForDataType: "data_type", - // ColumnNameForComment: "description", - // DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns, - //}.GetTableConfig() + tableID := NewTableIdentifier(tableData.TopicConfig().Database, tableData.TopicConfig().Schema, tableData.Name()) + query, args := describeTableQuery(tableID) + return shared.GetTableCfgArgs{ + Dwh: s, + TableID: tableID, + ConfigMap: s.configMap, + Query: query, + Args: args, + ColumnNameForName: "column_name", + ColumnNameForDataType: "data_type", + ColumnNameForComment: "description", + DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns, + }.GetTableConfig() } func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { @@ -68,7 +72,8 @@ func LoadStore(cfg config.Config) (Store, error) { return Store{}, err } return Store{ - Store: store, - cfg: cfg, + Store: store, + cfg: cfg, + configMap: &types.DwhToTablesConfigMap{}, }, nil }