Skip to content

Commit

Permalink
[Databricks] Better handling of TOAST columns (#1083)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 16, 2024
1 parent 4862a93 commit a725cd4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
9 changes: 6 additions & 3 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool {
}

func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%"
colName := sql.QuoteTableAliasColumn(tableAlias, column, d)
if column.KindDetails == typing.Struct {
return fmt.Sprintf("COALESCE(%s != {'key': '%s'}, true)", colName, constants.ToastUnavailableValuePlaceholder)
switch column.KindDetails {
case typing.String:
return fmt.Sprintf("COALESCE(%s NOT LIKE '%s', TRUE)", colName, toastedValue)
default:
return fmt.Sprintf("COALESCE(CAST(%s AS STRING) NOT LIKE '%s', TRUE)", colName, toastedValue)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
Expand Down
24 changes: 24 additions & 0 deletions clients/databricks/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,30 @@ func TestDatabricksDialect_IsTableDoesNotExistErr(t *testing.T) {
}
}

func TestDatabricks_BuildIsNotToastValueExpression(t *testing.T) {
{
// Unspecified data type
assert.Equal(t,
"COALESCE(CAST(tbl.`bar` AS STRING) NOT LIKE '%__debezium_unavailable_value%', TRUE)",
DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)),
)
}
{
// Structs
assert.Equal(t,
"COALESCE(CAST(tbl.`foo` AS STRING) NOT LIKE '%__debezium_unavailable_value%', TRUE)",
DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
}
{
// String
assert.Equal(t,
"COALESCE(tbl.`bar` NOT LIKE '%__debezium_unavailable_value%', TRUE)",
DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.String)),
)
}
}

func TestDatabricksDialect_BuildCreateTableQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{TABLE}")
Expand Down
2 changes: 1 addition & 1 deletion clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizatio

// Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
// We'll need \\\\N here because we need to string escape.
copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath())
copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('escape' = '"', 'delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath())
if _, err = s.ExecContext(ctx, copyCommand); err != nil {
return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err)
}
Expand Down

0 comments on commit a725cd4

Please sign in to comment.