diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 2ea8e49ba..43a25b71d 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -160,7 +160,7 @@ func (d DatabricksDialect) BuildSweepFilesFromVolumesQuery(dbName, schemaName, v } func (d DatabricksDialect) BuildRemoveFileFromVolumeQuery(filePath string) string { - return fmt.Sprintf("REMOVE '%s", filePath) + return fmt.Sprintf("REMOVE '%s'", filePath) } func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { diff --git a/clients/databricks/store.go b/clients/databricks/store.go index c49e474e1..4cb81ede4 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -190,6 +190,8 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab } func (s Store) SweepTemporaryTables() error { + // TODO: Update interface to include context + ctx := driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"}) tcs, err := s.cfg.TopicConfigs() if err != nil { return err @@ -197,7 +199,6 @@ func (s Store) SweepTemporaryTables() error { // Remove the temporary files from volumes for _, tc := range tcs { - fmt.Println("s.dialect().BuildSweepFilesFromVolumesQuery(tc.Database, tc.Schema, s.volume)", s.dialect().BuildSweepFilesFromVolumesQuery(tc.Database, tc.Schema, s.volume)) rows, err := s.Query(s.dialect().BuildSweepFilesFromVolumesQuery(tc.Database, tc.Schema, s.volume)) if err != nil { return fmt.Errorf("failed to sweep files from volumes: %w", err) @@ -215,7 +216,7 @@ func (s Store) SweepTemporaryTables() error { } if vol.ShouldDelete() { - if _, err = s.Exec(s.dialect().BuildRemoveFileFromVolumeQuery(vol.Path())); err != nil { + if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(vol.Path())); err != nil { return fmt.Errorf("failed to delete volume: %w", err) } }