diff --git a/clients/databricks/file.go b/clients/databricks/file.go new file mode 100644 index 000000000..6d541fa95 --- /dev/null +++ b/clients/databricks/file.go @@ -0,0 +1,60 @@ +package databricks + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/artie-labs/transfer/lib/destination/ddl" + "github.com/artie-labs/transfer/lib/typing" +) + +type File struct { + name string + fp string +} + +func NewFile(fileRow map[string]any) (File, error) { + _volName, isOk := fileRow["name"] + if !isOk { + return File{}, fmt.Errorf("name is missing") + } + + volName, err := typing.AssertType[string](_volName) + if err != nil { + return File{}, fmt.Errorf("name is not a string") + } + + _path, isOk := fileRow["path"] + if !isOk { + return File{}, fmt.Errorf("path is missing") + } + + path, err := typing.AssertType[string](_path) + if err != nil { + return File{}, fmt.Errorf("path is not a string") + } + + return File{name: volName, fp: path}, nil +} + +func NewFileFromTableID(tableID TableIdentifier, volume string) File { + name := fmt.Sprintf("%s.csv", tableID.Table()) + return File{ + name: name, + fp: fmt.Sprintf("/Volumes/%s/%s/%s/%s", tableID.Database(), tableID.Schema(), volume, name), + } + +} + +func (f File) ShouldDelete() bool { + return ddl.ShouldDeleteFromName(strings.TrimSuffix(f.name, ".csv")) +} + +func (f File) DBFSFilePath() string { + return filepath.Join("dbfs:", f.fp) +} + +func (f File) FilePath() string { + return f.fp +} diff --git a/clients/databricks/file_test.go b/clients/databricks/file_test.go new file mode 100644 index 000000000..1b510a625 --- /dev/null +++ b/clients/databricks/file_test.go @@ -0,0 +1,72 @@ +package databricks + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/stretchr/testify/assert" +) + +func TestNewFile(t *testing.T) { + { + // Invalid + { + // Missing name + _, err := NewFile(map[string]any{"path": "path"}) + assert.ErrorContains(t, err, "name is missing") + } + { + // Name isn't string + _, err := NewFile(map[string]any{"name": 1, "path": "path"}) + assert.ErrorContains(t, err, "name is not a string") + } + { + // Missing path + _, err := NewFile(map[string]any{"name": "name"}) + assert.ErrorContains(t, err, "path is missing") + } + { + // Path isn't string + _, err := NewFile(map[string]any{"name": "name", "path": 1}) + assert.ErrorContains(t, err, "path is not a string") + } + } + { + // Valid + file, err := NewFile(map[string]any{"name": "name", "path": "path"}) + assert.Nil(t, err) + assert.Equal(t, "name", file.name) + assert.Equal(t, "path", file.FilePath()) + } +} + +func newFile(name string) File { + return File{name: name} +} + +func TestFile_ShouldDelete(t *testing.T) { + { + assert.False(t, File{name: "name.csv"}.ShouldDelete()) + } + { + tablesToDrop := []string{ + "transactions___ARTIE_48GJC_1723663043", + fmt.Sprintf("expired_tbl_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()), + fmt.Sprintf("artie_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()), + } + + for _, tblToDelete := range tablesToDrop { + assert.True(t, newFile(strings.ToLower(tblToDelete)).ShouldDelete(), tblToDelete) + assert.True(t, newFile(strings.ToUpper(tblToDelete)).ShouldDelete(), tblToDelete) + assert.True(t, newFile(tblToDelete).ShouldDelete(), tblToDelete) + } + } +} + +func TestFile_DBFSFilePath(t *testing.T) { + file := NewFileFromTableID(NewTableIdentifier("{DB}", "{SCHEMA}", "{TABLE}"), "{VOLUME}") + assert.Equal(t, "dbfs:/Volumes/{DB}/{SCHEMA}/{VOLUME}/{TABLE}.csv", file.DBFSFilePath()) +} diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 2391a94f3..679f5460c 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -37,7 +37,6 @@ func describeTableQuery(tableID TableIdentifier) (string, []any) { } func (s Store) Merge(ctx context.Context, tableData *optimization.TableData) error { - // TODO: Once the merge is done, we should delete the file from the volume. return shared.Merge(ctx, s, tableData, types.MergeOpts{}) } @@ -119,23 +118,30 @@ func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizatio } }() - // Upload the local file to DBFS ctx = driverctx.NewContextWithStagingInfo(ctx, []string{"/var"}) - castedTempTableID, isOk := tempTableID.(TableIdentifier) if !isOk { return fmt.Errorf("failed to cast temp table ID to TableIdentifier") } - dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s/%s/%s.csv", castedTempTableID.Database(), castedTempTableID.Schema(), s.volume, castedTempTableID.Table()) - putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, dbfsFilePath) + file := NewFileFromTableID(castedTempTableID, s.volume) + putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, file.DBFSFilePath()) if _, err = s.ExecContext(ctx, putCommand); err != nil { return fmt.Errorf("failed to run PUT INTO for temporary table: %w", err) } + defer func() { + if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(file.FilePath())); err != nil { + slog.Warn("Failed to delete file from volume, it will be garbage collected later", + slog.Any("err", err), + slog.String("filePath", file.FilePath()), + ) + } + }() + // Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html // We'll need \\\\N here because we need to string escape. - copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath) + copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath()) if _, err = s.ExecContext(ctx, copyCommand); err != nil { return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err) } @@ -202,20 +208,20 @@ func (s Store) SweepTemporaryTables(ctx context.Context) error { return fmt.Errorf("failed to sweep files from volumes: %w", err) } - volumes, err := sql.RowsToObjects(rows) + files, err := sql.RowsToObjects(rows) if err != nil { return fmt.Errorf("failed to convert rows to objects: %w", err) } - for _, volume := range volumes { - vol, err := NewVolume(volume) + for _, _file := range files { + file, err := NewFile(_file) if err != nil { return err } - if vol.ShouldDelete() { - if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(vol.Path())); err != nil { - return fmt.Errorf("failed to delete volume: %w", err) + if file.ShouldDelete() { + if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(file.FilePath())); err != nil { + return fmt.Errorf("failed to delete file: %w", err) } } } diff --git a/clients/databricks/volume.go b/clients/databricks/volume.go deleted file mode 100644 index a03084073..000000000 --- a/clients/databricks/volume.go +++ /dev/null @@ -1,46 +0,0 @@ -package databricks - -import ( - "fmt" - "strings" - - "github.com/artie-labs/transfer/lib/destination/ddl" - "github.com/artie-labs/transfer/lib/typing" -) - -type Volume struct { - name string - path string -} - -func NewVolume(volumeRow map[string]any) (Volume, error) { - _volName, isOk := volumeRow["name"] - if !isOk { - return Volume{}, fmt.Errorf("volume name is missing") - } - - volName, err := typing.AssertType[string](_volName) - if err != nil { - return Volume{}, fmt.Errorf("volume name is not a string") - } - - _path, isOk := volumeRow["path"] - if !isOk { - return Volume{}, fmt.Errorf("volume path is missing") - } - - path, err := typing.AssertType[string](_path) - if err != nil { - return Volume{}, fmt.Errorf("volume path is not a string") - } - - return Volume{name: volName, path: path}, nil -} - -func (v Volume) ShouldDelete() bool { - return ddl.ShouldDeleteFromName(strings.TrimSuffix(v.name, ".csv")) -} - -func (v Volume) Path() string { - return v.path -} diff --git a/clients/databricks/volume_test.go b/clients/databricks/volume_test.go deleted file mode 100644 index 499eb9812..000000000 --- a/clients/databricks/volume_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package databricks - -import ( - "fmt" - "strings" - "testing" - "time" - - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/stretchr/testify/assert" -) - -func TestNewVolume(t *testing.T) { - { - // Invalid - { - // Missing name - _, err := NewVolume(map[string]any{"path": "path"}) - assert.ErrorContains(t, err, "volume name is missing") - } - { - // Name isn't string - _, err := NewVolume(map[string]any{"name": 1, "path": "path"}) - assert.ErrorContains(t, err, "volume name is not a string") - } - { - // Missing path - _, err := NewVolume(map[string]any{"name": "name"}) - assert.ErrorContains(t, err, "volume path is missing") - } - { - // Path isn't string - _, err := NewVolume(map[string]any{"name": "name", "path": 1}) - assert.ErrorContains(t, err, "volume path is not a string") - } - } - { - // Valid - volume, err := NewVolume(map[string]any{"name": "name", "path": "path"}) - assert.Nil(t, err) - assert.Equal(t, "name", volume.name) - assert.Equal(t, "path", volume.path) - } -} - -func newVolume(volName string) Volume { - return Volume{ - name: volName, - } -} - -func TestVolume_ShouldDelete(t *testing.T) { - { - // Should delete - volume := Volume{name: "name.csv"} - assert.False(t, volume.ShouldDelete()) - } - { - // Tables that are eligible to be dropped - tablesToDrop := []string{ - "transactions___ARTIE_48GJC_1723663043", - fmt.Sprintf("expired_tbl_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()), - fmt.Sprintf("artie_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()), - } - - for _, tblToDelete := range tablesToDrop { - assert.True(t, newVolume(strings.ToLower(tblToDelete)).ShouldDelete(), tblToDelete) - assert.True(t, newVolume(strings.ToUpper(tblToDelete)).ShouldDelete(), tblToDelete) - assert.True(t, newVolume(tblToDelete).ShouldDelete(), tblToDelete) - } - } -}