Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 28, 2024
1 parent 4af6682 commit c9cf389
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 21 deletions.
168 changes: 168 additions & 0 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package dialect

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

type DatabricksDialect struct{}

func (d DatabricksDialect) QuoteIdentifier(identifier string) string {
return fmt.Sprintf("`%s`", identifier)
}

func (d DatabricksDialect) EscapeStruct(value string) string {
return strings.ReplaceAll(value, "`", "``")
}

func (d DatabricksDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
// https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html
switch kd.Kind {
case typing.String.Kind:
return "STRING"
case typing.Integer.Kind:
return "INT"
case typing.Float.Kind:
return "FLOAT"
case typing.EDecimal.Kind:
return kd.ExtendedDecimalDetails.SnowflakeKind()
case typing.Boolean.Kind:
return "BOOLEAN"
case typing.ETime.Kind:
switch kd.ExtendedTimeDetails.Type {
case ext.TimestampTzKindType:
return "TIMESTAMP"
case ext.DateKindType:
return "DATE"
case ext.TimeKindType:
// Databricks doesn't have an explicit TIME type, so we use STRING instead
return "STRING"
}
case typing.Struct.Kind:
return "VARIANT"
case typing.Array.Kind:
// This is because Databricks requires typing within the element of an array (similar to BigQuery).
return "ARRAY<STRING>"
}

return kd.Kind
}

func (d DatabricksDialect) KindForDataType(_type string, _ string) (typing.KindDetails, error) {
// Implement the reverse mapping from Databricks data types to KindDetails
switch strings.ToUpper(_type) {
case "STRING":
return typing.String, nil
case "INT":
return typing.Integer, nil
case "FLOAT":
return typing.Float, nil
case "BOOLEAN":
return typing.Boolean, nil
case "VARIANT":
return typing.Struct, nil
}

return typing.KindDetails{}, fmt.Errorf("unsupported data type: %q", _type)
}

func (d DatabricksDialect) IsColumnAlreadyExistsErr(err error) bool {
// Implement the logic to check if the error is a "column already exists" error
return strings.Contains(err.Error(), "already exists")
}

func (d DatabricksDialect) IsTableDoesNotExistErr(err error) bool {
// Implement the logic to check if the error is a "table does not exist" error
return strings.Contains(err.Error(), "does not exist")
}

func (d DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
temp := ""
if temporary {
temp = "TEMPORARY "
}
return fmt.Sprintf("CREATE %sTABLE %s (%s)", temp, tableID.FullyQualifiedName(), strings.Join(colSQLParts, ", "))
}

func (d DatabricksDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
return fmt.Sprintf("%s.%s IS NOT NULL", tableAlias, column.Name)

Check failure on line 99 in clients/databricks/dialect/dialect.go

View workflow job for this annotation

GitHub Actions / test

Printf format %s has arg #2 of wrong type func() string (SA5009)
}

func (d DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
return fmt.Sprintf("DELETE FROM %s WHERE ROWID NOT IN (SELECT MAX(ROWID) FROM %s GROUP BY %s)", tableID.FullyQualifiedName(), tableID.FullyQualifiedName(), strings.Join(primaryKeys, ", "))
}

func (d DatabricksDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
var queries []string
queries = append(queries, fmt.Sprintf("CREATE OR REPLACE TEMPORARY VIEW %s AS SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1",
stagingTableID.FullyQualifiedName(), tableID.FullyQualifiedName(), strings.Join(primaryKeys, ", "), "updated_at DESC"))
queries = append(queries, fmt.Sprintf("DELETE FROM %s WHERE EXISTS (SELECT 1 FROM %s WHERE %s)", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName(), strings.Join(primaryKeys, " AND ")))
queries = append(queries, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName()))
return queries
}

func (d DatabricksDialect) BuildMergeQueries(
tableID sql.TableIdentifier,
subQuery string,
primaryKeys []columns.Column,
additionalEqualityStrings []string,
cols []columns.Column,
softDelete bool,
containsHardDeletes bool,
) ([]string, error) {
equalitySQLParts := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, d)
if len(additionalEqualityStrings) > 0 {
equalitySQLParts = append(equalitySQLParts, additionalEqualityStrings...)
}
baseQuery := fmt.Sprintf(`
MERGE INTO %s %s USING ( %s ) AS %s ON %s`,
tableID.FullyQualifiedName(), constants.TargetAlias, subQuery, constants.StagingAlias, strings.Join(equalitySQLParts, " AND "),
)

cols, err := columns.RemoveOnlySetDeleteColumnMarker(cols)
if err != nil {
return []string{}, err
}

if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN MATCHED AND IFNULL(%s, false) = true THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d),
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, d),
strings.Join(sql.QuoteColumns(cols, d), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","),
)}, nil
}

cols, err = columns.RemoveDeleteColumnMarker(cols)
if err != nil {
return []string{}, err
}

return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s THEN DELETE
WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
sql.QuotedDeleteColumnMarker(constants.StagingAlias, d),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), strings.Join(sql.QuoteColumns(cols, d), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","),
)}, nil
}

func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Backfill
}
43 changes: 28 additions & 15 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package databricks

import (
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/db"
"github.com/artie-labs/transfer/lib/destination/types"
Expand All @@ -15,40 +16,52 @@ type Store struct {
}

func (s Store) Merge(tableData *optimization.TableData) error {
//TODO implement me
panic("implement me")
return shared.Merge(s, tableData, types.MergeOpts{})
}

func (s Store) Append(tableData *optimization.TableData, useTempTable bool) error {
//TODO implement me
panic("implement me")
return shared.Append(s, tableData, types.AdditionalSettings{UseTempTable: useTempTable})
}

func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
//TODO implement me
panic("implement me")
//return NewTableIdentifier(topicConfig.Schema, table)
}

Check failure on line 28 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

missing return

func (s Store) Dialect() sql.Dialect {
//TODO implement me
panic("implement me")
//return dialect.DatabricksDialect{}
}

Check failure on line 32 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

missing return

func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
//TODO implement me
panic("implement me")
//return shared.Dedupe(s, tableID, primaryKeys, includeArtieUpdatedAt)
}

Check failure on line 36 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

missing return

func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
//TODO implement me
panic("implement me")
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
//query, args := describeTableQuery(tableID)
return shared.GetTableCfgArgs{
Dwh: s,
TableID: tableID,
ConfigMap: s.configMap,

Check failure on line 44 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

s.configMap undefined (type Store has no field or method configMap)
Query: query,

Check failure on line 45 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

undefined: query
Args: args,

Check failure on line 46 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

undefined: args
ColumnNameForName: "column_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "description",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
}.GetTableConfig()
}

func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error {
//TODO implement me
panic("implement me")
//return shared.PrepareTemporaryTable(s, tableData, tableConfig, tempTableID, parentTableID, additionalSettings, createTempTable)
}

Check failure on line 56 in clients/databricks/store.go

View workflow job for this annotation

GitHub Actions / test

missing return (compile)

func LoadStore(cfg config.Config) (Store, error) {
return Store{cfg: cfg}, nil
store, err := db.Open("databricks", cfg.Databricks.DSN())
if err != nil {
return Store{}, err
}
return Store{
Store: store,
cfg: cfg,
}, nil
}
13 changes: 12 additions & 1 deletion lib/config/destination_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import "github.com/artie-labs/transfer/lib/config/constants"
import (
"github.com/artie-labs/transfer/lib/config/constants"
)

type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
Expand Down Expand Up @@ -51,3 +53,12 @@ type Snowflake struct {
Host string `yaml:"host"`
Application string `yaml:"application"`
}

type Databricks struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
Protocol string `yaml:"protocol"`
}
11 changes: 11 additions & 0 deletions lib/config/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,14 @@ func (s Snowflake) ToConfig() (*gosnowflake.Config, error) {

return cfg, nil
}

func (d Databricks) DSN() string {
return fmt.Sprintf("%s://%s:%s@%s:%d/%s",
d.Protocol,
url.QueryEscape(d.Username),
url.QueryEscape(d.Password),
d.Host,
d.Port,
d.Database,
)
}
11 changes: 6 additions & 5 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ type Config struct {
Kafka *Kafka `yaml:"kafka,omitempty"`

// Supported destinations
MSSQL *MSSQL `yaml:"mssql,omitempty"`
BigQuery *BigQuery `yaml:"bigquery,omitempty"`
Snowflake *Snowflake `yaml:"snowflake,omitempty"`
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`
BigQuery *BigQuery `yaml:"bigquery,omitempty"`
Databricks *Databricks `yaml:"databricks,omitempty"`
MSSQL *MSSQL `yaml:"mssql,omitempty"`
Snowflake *Snowflake `yaml:"snowflake,omitempty"`
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`

SharedDestinationSettings SharedDestinationSettings `yaml:"sharedDestinationSettings"`
Reporting Reporting `yaml:"reporting"`
Expand Down

0 comments on commit c9cf389

Please sign in to comment.