From efb1cdc0bf662f023f7b2dbb7de0aa9d8739989a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 2 Oct 2024 10:34:22 -0700 Subject: [PATCH] Update. --- clients/databricks/store.go | 8 +++++--- lib/config/destination_types.go | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 3cf89d6f2..ba49c8018 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -27,6 +27,7 @@ import ( type Store struct { db.Store + volume string cfg config.Config configMap *types.DwhToTablesConfigMap } @@ -109,14 +110,15 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon return fmt.Errorf("failed to cast temp table ID to TableIdentifier") } - dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s.csv", castedTempTableID.Database(), castedTempTableID.Table()) + dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s/%s/%s.csv", castedTempTableID.Database(), castedTempTableID.Schema(), s.volume, castedTempTableID.Table()) putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, dbfsFilePath) if _, err = s.ExecContext(ctx, putCommand); err != nil { return fmt.Errorf("failed to run PUT INTO for temporary table: %w", err) } // Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html - copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath) + // We'll need \\\\N here because we need to string escape. + copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath) if _, err = s.ExecContext(ctx, copyCommand); err != nil { return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err) } @@ -125,7 +127,6 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon } func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { - // TODO: Test null values if colVal == nil { return `\\N`, nil } @@ -178,6 +179,7 @@ func LoadStore(cfg config.Config) (Store, error) { return Store{ Store: store, cfg: cfg, + volume: cfg.Databricks.Volume, configMap: &types.DwhToTablesConfigMap{}, }, nil } diff --git a/lib/config/destination_types.go b/lib/config/destination_types.go index 5c3003951..0ff0b3dbe 100644 --- a/lib/config/destination_types.go +++ b/lib/config/destination_types.go @@ -17,6 +17,7 @@ type Databricks struct { Port int `yaml:"port"` Catalog string `yaml:"catalog"` PersonalAccessToken string `yaml:"personalAccessToken"` + Volume string `yaml:"volume"` } type MSSQL struct {