Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 9, 2024
1 parent e473bc7 commit 72cf250
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 130 deletions.
60 changes: 60 additions & 0 deletions clients/databricks/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package databricks

import (
"fmt"
"path/filepath"
"strings"

"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/typing"
)

type File struct {
name string
fp string
}

func NewFile(fileRow map[string]any) (File, error) {
_volName, isOk := fileRow["name"]
if !isOk {
return File{}, fmt.Errorf("name is missing")
}

volName, err := typing.AssertType[string](_volName)
if err != nil {
return File{}, fmt.Errorf("name is not a string")
}

_path, isOk := fileRow["path"]
if !isOk {
return File{}, fmt.Errorf("path is missing")
}

path, err := typing.AssertType[string](_path)
if err != nil {
return File{}, fmt.Errorf("path is not a string")
}

return File{name: volName, fp: path}, nil
}

func NewFileFromTableID(tableID TableIdentifier, volume string) File {
name := fmt.Sprintf("%s.csv", tableID.Table())
return File{
name: name,
fp: fmt.Sprintf("/Volumes/%s/%s/%s/%s", tableID.Database(), tableID.Schema(), volume, name),
}

}

func (f File) ShouldDelete() bool {
return ddl.ShouldDeleteFromName(strings.TrimSuffix(f.name, ".csv"))
}

func (f File) DBFSFilePath() string {
return filepath.Join("dbfs:", f.fp)
}

func (f File) FilePath() string {
return f.fp
}
72 changes: 72 additions & 0 deletions clients/databricks/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package databricks

import (
"fmt"
"strings"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/stretchr/testify/assert"
)

func TestNewFile(t *testing.T) {
{
// Invalid
{
// Missing name
_, err := NewFile(map[string]any{"path": "path"})
assert.ErrorContains(t, err, "name is missing")
}
{
// Name isn't string
_, err := NewFile(map[string]any{"name": 1, "path": "path"})
assert.ErrorContains(t, err, "name is not a string")
}
{
// Missing path
_, err := NewFile(map[string]any{"name": "name"})
assert.ErrorContains(t, err, "path is missing")
}
{
// Path isn't string
_, err := NewFile(map[string]any{"name": "name", "path": 1})
assert.ErrorContains(t, err, "path is not a string")
}
}
{
// Valid
file, err := NewFile(map[string]any{"name": "name", "path": "path"})
assert.Nil(t, err)
assert.Equal(t, "name", file.name)
assert.Equal(t, "path", file.FilePath())
}
}

func newFile(name string) File {
return File{name: name}
}

func TestFile_ShouldDelete(t *testing.T) {
{
assert.False(t, File{name: "name.csv"}.ShouldDelete())
}
{
tablesToDrop := []string{
"transactions___ARTIE_48GJC_1723663043",
fmt.Sprintf("expired_tbl_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()),
fmt.Sprintf("artie_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()),
}

for _, tblToDelete := range tablesToDrop {
assert.True(t, newFile(strings.ToLower(tblToDelete)).ShouldDelete(), tblToDelete)
assert.True(t, newFile(strings.ToUpper(tblToDelete)).ShouldDelete(), tblToDelete)
assert.True(t, newFile(tblToDelete).ShouldDelete(), tblToDelete)
}
}
}

func TestFile_DBFSFilePath(t *testing.T) {
file := NewFileFromTableID(NewTableIdentifier("{DB}", "{SCHEMA}", "{TABLE}"), "{VOLUME}")
assert.Equal(t, "dbfs:/Volumes/{DB}/{SCHEMA}/{VOLUME}/{TABLE}.csv", file.DBFSFilePath())
}
30 changes: 18 additions & 12 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func describeTableQuery(tableID TableIdentifier) (string, []any) {
}

func (s Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
// TODO: Once the merge is done, we should delete the file from the volume.
return shared.Merge(ctx, s, tableData, types.MergeOpts{})
}

Expand Down Expand Up @@ -119,23 +118,30 @@ func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizatio
}
}()

// Upload the local file to DBFS
ctx = driverctx.NewContextWithStagingInfo(ctx, []string{"/var"})

castedTempTableID, isOk := tempTableID.(TableIdentifier)
if !isOk {
return fmt.Errorf("failed to cast temp table ID to TableIdentifier")
}

dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s/%s/%s.csv", castedTempTableID.Database(), castedTempTableID.Schema(), s.volume, castedTempTableID.Table())
putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, dbfsFilePath)
file := NewFileFromTableID(castedTempTableID, s.volume)
putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, file.DBFSFilePath())
if _, err = s.ExecContext(ctx, putCommand); err != nil {
return fmt.Errorf("failed to run PUT INTO for temporary table: %w", err)
}

defer func() {
if _, err = s.Exec(s.dialect().BuildRemoveFileFromVolumeQuery(file.FilePath())); err != nil {
slog.Warn("Failed to delete file from volume, it will be garbage collected later",
slog.Any("err", err),
slog.String("filePath", file.FilePath()),
)
}
}()

// Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
// We'll need \\\\N here because we need to string escape.
copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath)
copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath())
if _, err = s.ExecContext(ctx, copyCommand); err != nil {
return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err)
}
Expand Down Expand Up @@ -202,20 +208,20 @@ func (s Store) SweepTemporaryTables(ctx context.Context) error {
return fmt.Errorf("failed to sweep files from volumes: %w", err)
}

volumes, err := sql.RowsToObjects(rows)
files, err := sql.RowsToObjects(rows)
if err != nil {
return fmt.Errorf("failed to convert rows to objects: %w", err)
}

for _, volume := range volumes {
vol, err := NewVolume(volume)
for _, _file := range files {
file, err := NewFile(_file)
if err != nil {
return err
}

if vol.ShouldDelete() {
if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(vol.Path())); err != nil {
return fmt.Errorf("failed to delete volume: %w", err)
if file.ShouldDelete() {
if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(file.FilePath())); err != nil {
return fmt.Errorf("failed to delete file: %w", err)
}
}
}
Expand Down
46 changes: 0 additions & 46 deletions clients/databricks/volume.go

This file was deleted.

72 changes: 0 additions & 72 deletions clients/databricks/volume_test.go

This file was deleted.

0 comments on commit 72cf250

Please sign in to comment.