Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting Databricks - Part Four #942

Merged
merged 24 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package databricks

import (
"context"
"encoding/csv"
"fmt"
"log/slog"
"os"
"path/filepath"

_ "github.com/databricks/databricks-sql-go"
"github.com/databricks/databricks-sql-go/driverctx"

"github.com/artie-labs/transfer/clients/databricks/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/db"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/values"
)

type Store struct {
db.Store
volume string
cfg config.Config
configMap *types.DwhToTablesConfigMap
}

func describeTableQuery(tableID TableIdentifier) (string, []any) {
return fmt.Sprintf("DESCRIBE TABLE %s", tableID.FullyQualifiedName()), nil
}

func (s Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, types.MergeOpts{})
}

func (s Store) Append(tableData *optimization.TableData, useTempTable bool) error {
return shared.Append(s, tableData, types.AdditionalSettings{UseTempTable: useTempTable})
}

func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
return NewTableIdentifier(topicConfig.Database, topicConfig.Schema, table)
}

func (s Store) Dialect() sql.Dialect {
return dialect.DatabricksDialect{}
}

func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
panic("not implemented")
}

func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
tableID := NewTableIdentifier(tableData.TopicConfig().Database, tableData.TopicConfig().Schema, tableData.Name())
query, args := describeTableQuery(tableID)
return shared.GetTableCfgArgs{
Dwh: s,
TableID: tableID,
ConfigMap: s.configMap,
Query: query,
Args: args,
ColumnNameForName: "col_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "comment",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
}.GetTableConfig()
}

func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
// TODO: Update PrepareTemporaryTable interface to include context
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(s, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}
}

fp, err := s.writeTemporaryTableFile(tableData, tempTableID)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
}

defer func() {
// In the case where PUT or COPY fails, we'll at least delete the temporary file.
if deleteErr := os.RemoveAll(fp); deleteErr != nil {
slog.Warn("Failed to delete temp file", slog.Any("err", deleteErr), slog.String("filePath", fp))
}
}()

// Upload the local file to DBFS
ctx := driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"})

castedTempTableID, isOk := tempTableID.(TableIdentifier)
if !isOk {
return fmt.Errorf("failed to cast temp table ID to TableIdentifier")
}

dbfsFilePath := fmt.Sprintf("dbfs:/Volumes/%s/%s/%s/%s.csv", castedTempTableID.Database(), castedTempTableID.Schema(), s.volume, castedTempTableID.Table())
putCommand := fmt.Sprintf("PUT '%s' INTO '%s' OVERWRITE", fp, dbfsFilePath)
if _, err = s.ExecContext(ctx, putCommand); err != nil {
return fmt.Errorf("failed to run PUT INTO for temporary table: %w", err)
}

// Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
// We'll need \\\\N here because we need to string escape.
copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), dbfsFilePath)
if _, err = s.ExecContext(ctx, copyCommand); err != nil {
return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err)
}

return nil
}

func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
if colVal == nil {
return `\\N`, nil
}

value, err := values.ToString(colVal, colKind)
if err != nil {
return "", err
}

return value, nil
}

func (s Store) writeTemporaryTableFile(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, error) {
fp := filepath.Join(os.TempDir(), fmt.Sprintf("%s.csv", newTableID.FullyQualifiedName()))
file, err := os.Create(fp)
if err != nil {
return "", err
}

defer file.Close()
writer := csv.NewWriter(file)
writer.Comma = '\t'

columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails)
if castErr != nil {
return "", castErr
}

row = append(row, castedValue)
}

if err = writer.Write(row); err != nil {
return "", fmt.Errorf("failed to write to csv: %w", err)
}
}

writer.Flush()
return fp, writer.Error()
}

func LoadStore(cfg config.Config) (Store, error) {
store, err := db.Open("databricks", cfg.Databricks.DSN())
if err != nil {
return Store{}, err
}
return Store{
Store: store,
cfg: cfg,
volume: cfg.Databricks.Volume,
configMap: &types.DwhToTablesConfigMap{},
}, nil
}
17 changes: 17 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
github.com/cockroachdb/apd/v3 v3.2.1
github.com/databricks/databricks-sql-go v1.6.1
github.com/getsentry/sentry-go v0.27.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.6.0
Expand Down Expand Up @@ -47,7 +48,9 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
Expand All @@ -65,14 +68,19 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/coreos/go-oidc/v3 v3.5.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-jose/go-jose/v3 v3.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.3 // indirect
Expand All @@ -86,13 +94,17 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
Expand All @@ -101,11 +113,15 @@ require (
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/jwx v1.2.30 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/zerolog v1.28.0 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/samber/slog-common v0.16.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
Expand Down Expand Up @@ -141,4 +157,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gotest.tools/gotestsum v1.8.2 // indirect
)
Loading