Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 30, 2024
1 parent bbd06ae commit 02caa70
Showing 1 changed file with 100 additions and 0 deletions.
100 changes: 100 additions & 0 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ package dialect

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/sql"
)

type DatabricksDialect struct{}
Expand All @@ -13,3 +22,94 @@ func (DatabricksDialect) QuoteIdentifier(identifier string) string {
func (DatabricksDialect) EscapeStruct(value string) string {
panic("not implemented")
}

func (DatabricksDialect) IsColumnAlreadyExistsErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "[FIELDS_ALREADY_EXISTS]")
}

func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "[TABLE_OR_VIEW_NOT_FOUND]")
}

func (DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Databricks doesn't have a concept of temporary tables.
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ", "))
}

func (DatabricksDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
panic("not implemented")
}

func (DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
panic("not implemented")
}

func (DatabricksDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ bool) []string {
panic("not implemented") // We don't currently support deduping for MS SQL.
}

func (d DatabricksDialect) BuildMergeQueries(
tableID sql.TableIdentifier,
subQuery string,
primaryKeys []columns.Column,
additionalEqualityStrings []string,
cols []columns.Column,
softDelete bool,
_ bool,
) ([]string, error) {
// Build the base equality condition for the MERGE query
equalitySQLParts := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, d)
if len(additionalEqualityStrings) > 0 {
equalitySQLParts = append(equalitySQLParts, additionalEqualityStrings...)
}

// Construct the base MERGE query
baseQuery := fmt.Sprintf(`MERGE INTO %s %s USING %s %s ON %s`, tableID.FullyQualifiedName(), constants.TargetAlias, subQuery, constants.StagingAlias, strings.Join(equalitySQLParts, " AND "))
// Remove columns with only the delete marker, as they are handled separately
cols, err := columns.RemoveOnlySetDeleteColumnMarker(cols)
if err != nil {
return nil, err
}

if softDelete {
// If softDelete is enabled, handle both update and soft-delete logic
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN MATCHED AND IFNULL(%s, false) = true THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d),
sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d),
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d),
sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, d),
strings.Join(sql.QuoteColumns(cols, d), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","),
)}, nil
}

// Remove the delete marker for hard-delete logic
cols, err = columns.RemoveDeleteColumnMarker(cols)
if err != nil {
return nil, err
}

// Handle the case where hard-deletes are included
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s THEN DELETE
WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
sql.QuotedDeleteColumnMarker(constants.StagingAlias, d),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, d),
sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, d),
strings.Join(sql.QuoteColumns(cols, d), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","),
)}, nil
}

func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy {
return sql.Native
}

0 comments on commit 02caa70

Please sign in to comment.