diff --git a/clients/databricks/store.go b/clients/databricks/store.go index fcf3e6ce3..67d7f7854 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -27,6 +27,7 @@ import ( type Store struct { db.Store + volume string cfg config.Config configMap *types.DwhToTablesConfigMap } @@ -73,7 +74,6 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { // TODO: Update PrepareTemporaryTable interface to include context - if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), @@ -90,7 +90,6 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon } } - // Write data into a temporary file fp, err := s.writeTemporaryTableFile(tableData, tempTableID) if err != nil { return fmt.Errorf("failed to load temporary table: %w", err) @@ -103,22 +102,15 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon } }() - castedTempTableID, isOk := tempTableID.(TableIdentifier) - if !isOk { - return fmt.Errorf("failed to cast tempTableID to TableIdentifier") - } - + // Upload the local file to DBFS ctx := driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"}) - - dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s/vol_test/%s.csv", castedTempTableID.Database(), castedTempTableID.Schema(), tempTableID.Table()) - // Use the PUT INTO command to upload the file to Databricks + dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s.csv", s.volume, tempTableID.Table()) putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, dbfsFilePath) if _, err = s.ExecContext(ctx, putCommand); err != nil { return fmt.Errorf("failed to run PUT INTO for temporary table: %w", err) } - // Use the COPY INTO command to load the data into the temporary table - // Ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html + // Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath) if _, err = s.ExecContext(ctx, copyCommand); err != nil { return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err) @@ -182,6 +174,7 @@ func LoadStore(cfg config.Config) (Store, error) { return Store{ Store: store, cfg: cfg, + volume: cfg.Databricks.Volume, configMap: &types.DwhToTablesConfigMap{}, }, nil } diff --git a/lib/config/destination_types.go b/lib/config/destination_types.go index 70068dacb..8ce990a36 100644 --- a/lib/config/destination_types.go +++ b/lib/config/destination_types.go @@ -17,6 +17,7 @@ type Databricks struct { Port int `yaml:"port"` Catalog string `yaml:"catalog"` PersonalAccessToken string `yaml:"personalAccessToken"` + Volume string `yaml:"volume"` } type MSSQL struct {