From 7e4a930037bf5167ee3a3416d181c8b45a9ab6dd Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 30 Sep 2024 11:45:41 -0700 Subject: [PATCH] WIP. --- clients/databricks/dialect/dialect.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 2282b0074..be27b0f22 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -116,44 +116,55 @@ func (d DatabricksDialect) BuildMergeQueries( softDelete bool, containsHardDeletes bool, ) ([]string, error) { + // Build the base equality condition for the MERGE query equalitySQLParts := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, d) if len(additionalEqualityStrings) > 0 { equalitySQLParts = append(equalitySQLParts, additionalEqualityStrings...) } + + // Construct the base MERGE query baseQuery := fmt.Sprintf(` -MERGE INTO %s %s USING ( %s ) AS %s ON %s`, +MERGE INTO %s %s USING %s %s ON %s`, tableID.FullyQualifiedName(), constants.TargetAlias, subQuery, constants.StagingAlias, strings.Join(equalitySQLParts, " AND "), ) + // Remove columns with only the delete marker, as they are handled separately cols, err := columns.RemoveOnlySetDeleteColumnMarker(cols) if err != nil { - return []string{}, err + return nil, err } if softDelete { + // If softDelete is enabled, handle both update and soft-delete logic return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s WHEN MATCHED AND IFNULL(%s, false) = true THEN UPDATE SET %s WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, - sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d), - sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, d), + sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), + sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d), + sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), + sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, d), strings.Join(sql.QuoteColumns(cols, d), ","), strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","), )}, nil } + // Remove the delete marker for hard-delete logic cols, err = columns.RemoveDeleteColumnMarker(cols) if err != nil { - return []string{}, err + return nil, err } + // Handle the case where hard-deletes are included return []string{baseQuery + fmt.Sprintf(` WHEN MATCHED AND %s THEN DELETE WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), - sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d), - sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), strings.Join(sql.QuoteColumns(cols, d), ","), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), + sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), + strings.Join(sql.QuoteColumns(cols, d), ","), strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","), )}, nil }