Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 2, 2024
1 parent 23a89fc commit 08b0967
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 12 deletions.
17 changes: 5 additions & 12 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

type Store struct {
db.Store
volume string
cfg config.Config
configMap *types.DwhToTablesConfigMap
}
Expand Down Expand Up @@ -73,7 +74,6 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl

func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
// TODO: Update PrepareTemporaryTable interface to include context

if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Expand All @@ -90,7 +90,6 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon
}
}

// Write data into a temporary file
fp, err := s.writeTemporaryTableFile(tableData, tempTableID)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
Expand All @@ -103,22 +102,15 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon
}
}()

castedTempTableID, isOk := tempTableID.(TableIdentifier)
if !isOk {
return fmt.Errorf("failed to cast tempTableID to TableIdentifier")
}

// Upload the local file to DBFS
ctx := driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"})

dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s/vol_test/%s.csv", castedTempTableID.Database(), castedTempTableID.Schema(), tempTableID.Table())
// Use the PUT INTO command to upload the file to Databricks
dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s.csv", s.volume, tempTableID.Table())
putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, dbfsFilePath)
if _, err = s.ExecContext(ctx, putCommand); err != nil {
return fmt.Errorf("failed to run PUT INTO for temporary table: %w", err)
}

// Use the COPY INTO command to load the data into the temporary table
// Ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
// Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath)
if _, err = s.ExecContext(ctx, copyCommand); err != nil {
return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err)
Expand Down Expand Up @@ -182,6 +174,7 @@ func LoadStore(cfg config.Config) (Store, error) {
return Store{
Store: store,
cfg: cfg,
volume: cfg.Databricks.Volume,
configMap: &types.DwhToTablesConfigMap{},
}, nil
}
1 change: 1 addition & 0 deletions lib/config/destination_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Databricks struct {
Port int `yaml:"port"`
Catalog string `yaml:"catalog"`
PersonalAccessToken string `yaml:"personalAccessToken"`
Volume string `yaml:"volume"`
}

type MSSQL struct {
Expand Down

0 comments on commit 08b0967

Please sign in to comment.