Skip to content

Commit

Permalink
Merge branch 'master' into redshift-string-precision
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 9, 2024
2 parents aa3c8b7 + e473bc7 commit d9813c5
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 9 deletions.
2 changes: 1 addition & 1 deletion clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includ
return destination.ExecStatements(s, dedupeQueries)
}

func (s *Store) SweepTemporaryTables() error {
func (s *Store) SweepTemporaryTables(_ context.Context) error {
// BigQuery doesn't need to sweep temporary tables, since they support setting TTL on temporary tables.
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ WHERE
UPPER(table_schema) = UPPER(:p_schema) AND table_name ILIKE :p_artie_prefix`, d.QuoteIdentifier(dbName)), []any{dbsql.Parameter{Name: "p_schema", Value: schemaName}, dbsql.Parameter{Name: "p_artie_prefix", Value: "%" + constants.ArtiePrefix + "%"}}
}

func (d DatabricksDialect) BuildSweepFilesFromVolumesQuery(dbName, schemaName, volumeName string) string {
return fmt.Sprintf("LIST '/Volumes/%s/%s/%s'", dbName, schemaName, volumeName)
}

func (d DatabricksDialect) BuildRemoveFileFromVolumeQuery(filePath string) string {
return fmt.Sprintf("REMOVE '%s'", filePath)
}

func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Native
}
32 changes: 30 additions & 2 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func describeTableQuery(tableID TableIdentifier) (string, []any) {
}

func (s Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
// TODO: Once the merge is done, we should delete the file from the volume.
return shared.Merge(ctx, s, tableData, types.MergeOpts{})
}

Expand Down Expand Up @@ -187,13 +188,40 @@ func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTab
return fp, writer.Error()
}

func (s Store) SweepTemporaryTables() error {
// TODO: We should also remove old volumes
func (s Store) SweepTemporaryTables(ctx context.Context) error {
tcs, err := s.cfg.TopicConfigs()
if err != nil {
return err
}

ctx = driverctx.NewContextWithStagingInfo(ctx, []string{"/var"})
// Remove the temporary files from volumes
for _, tc := range tcs {
rows, err := s.Query(s.dialect().BuildSweepFilesFromVolumesQuery(tc.Database, tc.Schema, s.volume))
if err != nil {
return fmt.Errorf("failed to sweep files from volumes: %w", err)
}

volumes, err := sql.RowsToObjects(rows)
if err != nil {
return fmt.Errorf("failed to convert rows to objects: %w", err)
}

for _, volume := range volumes {
vol, err := NewVolume(volume)
if err != nil {
return err
}

if vol.ShouldDelete() {
if _, err = s.ExecContext(ctx, s.dialect().BuildRemoveFileFromVolumeQuery(vol.Path())); err != nil {
return fmt.Errorf("failed to delete volume: %w", err)
}
}
}
}

// Delete the temporary tables
return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery)
}

Expand Down
46 changes: 46 additions & 0 deletions clients/databricks/volume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package databricks

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/typing"
)

type Volume struct {
name string
path string
}

func NewVolume(volumeRow map[string]any) (Volume, error) {
_volName, isOk := volumeRow["name"]
if !isOk {
return Volume{}, fmt.Errorf("volume name is missing")
}

volName, err := typing.AssertType[string](_volName)
if err != nil {
return Volume{}, fmt.Errorf("volume name is not a string")
}

_path, isOk := volumeRow["path"]
if !isOk {
return Volume{}, fmt.Errorf("volume path is missing")
}

path, err := typing.AssertType[string](_path)
if err != nil {
return Volume{}, fmt.Errorf("volume path is not a string")
}

return Volume{name: volName, path: path}, nil
}

func (v Volume) ShouldDelete() bool {
return ddl.ShouldDeleteFromName(strings.TrimSuffix(v.name, ".csv"))
}

func (v Volume) Path() string {
return v.path
}
72 changes: 72 additions & 0 deletions clients/databricks/volume_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package databricks

import (
"fmt"
"strings"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/stretchr/testify/assert"
)

func TestNewVolume(t *testing.T) {
{
// Invalid
{
// Missing name
_, err := NewVolume(map[string]any{"path": "path"})
assert.ErrorContains(t, err, "volume name is missing")
}
{
// Name isn't string
_, err := NewVolume(map[string]any{"name": 1, "path": "path"})
assert.ErrorContains(t, err, "volume name is not a string")
}
{
// Missing path
_, err := NewVolume(map[string]any{"name": "name"})
assert.ErrorContains(t, err, "volume path is missing")
}
{
// Path isn't string
_, err := NewVolume(map[string]any{"name": "name", "path": 1})
assert.ErrorContains(t, err, "volume path is not a string")
}
}
{
// Valid
volume, err := NewVolume(map[string]any{"name": "name", "path": "path"})
assert.Nil(t, err)
assert.Equal(t, "name", volume.name)
assert.Equal(t, "path", volume.path)
}
}

func newVolume(volName string) Volume {
return Volume{
name: volName,
}
}

func TestVolume_ShouldDelete(t *testing.T) {
{
// Should delete
volume := Volume{name: "name.csv"}
assert.False(t, volume.ShouldDelete())
}
{
// Tables that are eligible to be dropped
tablesToDrop := []string{
"transactions___ARTIE_48GJC_1723663043",
fmt.Sprintf("expired_tbl_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()),
fmt.Sprintf("artie_%s_suffix_%d", constants.ArtiePrefix, time.Now().Add(-1*constants.TemporaryTableTTL).Unix()),
}

for _, tblToDelete := range tablesToDrop {
assert.True(t, newVolume(strings.ToLower(tblToDelete)).ShouldDelete(), tblToDelete)
assert.True(t, newVolume(strings.ToUpper(tblToDelete)).ShouldDelete(), tblToDelete)
assert.True(t, newVolume(tblToDelete).ShouldDelete(), tblToDelete)
}
}
}
2 changes: 1 addition & 1 deletion clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq
return s.specificIdentifierFor(topicConfig, table)
}

func (s *Store) SweepTemporaryTables() error {
func (s *Store) SweepTemporaryTables(_ context.Context) error {
tcs, err := s.config.TopicConfigs()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab
}.GetTableConfig()
}

func (s *Store) SweepTemporaryTables() error {
func (s *Store) SweepTemporaryTables(_ context.Context) error {
tcs, err := s.config.TopicConfigs()
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
}

temporaryTableID := TempTableIDWithSuffix(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix())

defer func() {
if dropErr := ddl.DropTemporaryTable(dwh, temporaryTableID, false); dropErr != nil {
slog.Warn("Failed to drop temporary table", slog.Any("err", dropErr), slog.String("tableName", temporaryTableID.FullyQualifiedName()))
Expand Down
3 changes: 2 additions & 1 deletion clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package snowflake

import (
"context"
"fmt"

"github.com/snowflakedb/gosnowflake"
Expand Down Expand Up @@ -40,7 +41,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab
}.GetTableConfig()
}

func (s *Store) SweepTemporaryTables() error {
func (s *Store) SweepTemporaryTables(_ context.Context) error {
tcs, err := s.config.TopicConfigs()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type DataWarehouse interface {
// SQL specific commands
Dialect() sqllib.Dialect
Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error
SweepTemporaryTables() error
SweepTemporaryTables(ctx context.Context) error
Exec(query string, args ...any) (sql.Result, error)
Query(query string, args ...any) (*sql.Rows, error)
Begin() (*sql.Tx, error)
Expand Down
41 changes: 41 additions & 0 deletions lib/sql/rows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sql

import (
"database/sql"
"fmt"
)

func RowsToObjects(rows *sql.Rows) ([]map[string]any, error) {
defer rows.Close()

columns, err := rows.Columns()
if err != nil {
return nil, err
}

var objects []map[string]any
for rows.Next() {
row := make([]any, len(columns))
rowPointers := make([]any, len(columns))
for i := range row {
rowPointers[i] = &row[i]
}

if err = rows.Scan(rowPointers...); err != nil {
return nil, err
}

object := make(map[string]any)
for i, column := range columns {
object[column] = row[i]
}

objects = append(objects, object)
}

if err = rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over rows: %w", err)
}

return objects, nil
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func main() {
logger.Fatal("Unable to load data warehouse destination", slog.Any("err", err))
}

if err = dwh.SweepTemporaryTables(); err != nil {
if err = dwh.SweepTemporaryTables(ctx); err != nil {
logger.Fatal("Failed to clean up temporary tables", slog.Any("err", err))
}

Expand Down

0 comments on commit d9813c5

Please sign in to comment.