Skip to content

Commit

Permalink
Remove idempotentKey from BuildMergeQueries()
Browse files Browse the repository at this point in the history
  • Loading branch information
danafallon committed Jul 20, 2024
1 parent 42a388c commit f372cb3
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 317 deletions.
31 changes: 9 additions & 22 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,25 +207,12 @@ func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableId
func (bd BigQueryDialect) BuildMergeQueries(
tableID sql.TableIdentifier,
subQuery string,
idempotentKey string,
primaryKeys []columns.Column,
additionalEqualityStrings []string,
cols []columns.Column,
softDelete bool,
_ bool,
) ([]string, error) {
// We should not need idempotency key for DELETE
// This is based on the assumption that the primary key would be atomically increasing or UUID based
// With AI, the sequence will increment (never decrement). And UUID is there to prevent universal hash collision
// However, there may be edge cases where folks end up restoring deleted rows (which will contain the same PK).

// We also need to do staged table's idempotency key is GTE target table's idempotency key
// This is because Snowflake does not respect NS granularity.
var idempotentClause string
if idempotentKey != "" {
idempotentClause = fmt.Sprintf("AND %s.%s >= %s.%s ", constants.StagingAlias, idempotentKey, constants.TargetAlias, idempotentKey)
}

var equalitySQLParts []string
for _, primaryKey := range primaryKeys {
equalitySQL := sql.BuildColumnComparison(primaryKey, constants.TargetAlias, constants.StagingAlias, sql.Equal, bd)
Expand Down Expand Up @@ -255,17 +242,17 @@ MERGE INTO %s %s USING %s AS %s ON %s`,

if softDelete {
return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED %sAND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN MATCHED %sAND IFNULL(%s, false) = true THEN UPDATE SET %s
WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN MATCHED AND IFNULL(%s, false) = true THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
// Updating or soft-deleting when we have the previous values (update all columns)
// WHEN MATCHED %sAND IFNULL(%s, false) = false
idempotentClause, sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, bd),
// WHEN MATCHED AND IFNULL(%s, false) = false
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, bd),
// THEN UPDATE SET %s
sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
// Soft deleting when we don't have the previous values (only update the __artie_delete column)
// WHEN MATCHED %sAND IFNULL(%s, false) = true
idempotentClause, sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, bd),
// WHEN MATCHED AND IFNULL(%s, false) = true
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, bd),
// THEN UPDATE SET %s
sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, bd),
// Inserting
Expand All @@ -284,12 +271,12 @@ WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,

return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s THEN DELETE
WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s
WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
// WHEN MATCHED AND %s THEN DELETE
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd),
// WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
// WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
// WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s)
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","),
// VALUES (%s);
Expand Down
34 changes: 0 additions & 34 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func TestBigQueryDialect_BuildMergeQueries_TempTable(t *testing.T) {
statements, err := BigQueryDialect{}.BuildMergeQueries(
fakeTableID,
"customers.orders_tmp",
"",
[]columns.Column{cols[0]},
nil,
cols,
Expand Down Expand Up @@ -256,7 +255,6 @@ func TestBigQueryDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
statements, err := BigQueryDialect{}.BuildMergeQueries(
fakeTableID,
"{SUB_QUERY}",
"",
[]columns.Column{cols[0]},
nil,
cols,
Expand All @@ -273,37 +271,6 @@ func TestBigQueryDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

func TestBigQueryDialect_BuildMergeQueries_IdempotentKey(t *testing.T) {
var cols = []columns.Column{
columns.NewColumn("order_id", typing.Integer),
columns.NewColumn("name", typing.String),
columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean),
columns.NewColumn(constants.OnlySetDeleteColumnMarker, typing.Boolean),
}

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("customers.orders")

statements, err := BigQueryDialect{}.BuildMergeQueries(
fakeTableID,
"{SUB_QUERY}",
"idempotent_key",
[]columns.Column{cols[0]},
nil,
cols,
true,
false,
)
assert.NoError(t, err)
assert.Len(t, statements, 1)
assert.Equal(t, []string{
"MERGE INTO customers.orders tgt USING {SUB_QUERY} AS stg ON tgt.`order_id` = stg.`order_id`",
"WHEN MATCHED AND stg.idempotent_key >= tgt.idempotent_key AND IFNULL(stg.`__artie_only_set_delete`, false) = false THEN UPDATE SET `order_id`=stg.`order_id`,`name`=stg.`name`,`__artie_delete`=stg.`__artie_delete`",
"WHEN MATCHED AND stg.idempotent_key >= tgt.idempotent_key AND IFNULL(stg.`__artie_only_set_delete`, false) = true THEN UPDATE SET `__artie_delete`=stg.`__artie_delete`",
"WHEN NOT MATCHED THEN INSERT (`order_id`,`name`,`__artie_delete`) VALUES (stg.`order_id`,stg.`name`,stg.`__artie_delete`);"},
strings.Split(strings.TrimSpace(statements[0]), "\n"))
}

func TestBigQueryDialect_BuildMergeQueries_JSONKey(t *testing.T) {
orderOIDCol := columns.NewColumn("order_oid", typing.Struct)
var cols columns.Columns
Expand All @@ -318,7 +285,6 @@ func TestBigQueryDialect_BuildMergeQueries_JSONKey(t *testing.T) {
statements, err := BigQueryDialect{}.BuildMergeQueries(
fakeTableID,
"customers.orders_tmp",
"",
[]columns.Column{orderOIDCol},
nil,
cols.ValidColumns(),
Expand Down
24 changes: 7 additions & 17 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,12 @@ func (MSSQLDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ b
func (md MSSQLDialect) BuildMergeQueries(
tableID sql.TableIdentifier,
subQuery string,
idempotentKey string,
primaryKeys []columns.Column,
_ []string,
cols []columns.Column,
softDelete bool,
_ bool,
) ([]string, error) {
// TODO remove support for idempotentKey
var idempotentClause string
if idempotentKey != "" {
idempotentClause = fmt.Sprintf("AND %s.%s >= %s.%s ", constants.StagingAlias, idempotentKey, constants.TargetAlias, idempotentKey)
}

joinOn := strings.Join(sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, md), " AND ")
baseQuery := fmt.Sprintf(`
MERGE INTO %s %s
Expand All @@ -209,9 +202,6 @@ USING %s AS %s ON %s`,
// Issue an insert statement for new rows, plus two update statements:
// one for rows where all columns should be updated and
// one for rows where only the __artie_delete column should be updated.
if idempotentClause != "" {
idempotentClause = " " + idempotentClause
}
return []string{
fmt.Sprintf(`
INSERT INTO %s (%s)
Expand All @@ -229,23 +219,23 @@ WHERE %s IS NULL;`,
),
fmt.Sprintf(`
UPDATE %s SET %s
FROM %s AS %s LEFT JOIN %s AS %s ON %s%s
FROM %s AS %s LEFT JOIN %s AS %s ON %s
WHERE COALESCE(%s, 0) = 0;`,
// UPDATE table set [all columns]
constants.TargetAlias, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
// FROM staging AS stg LEFT JOIN target AS tgt ON tgt.pk = stg.pk
subQuery, constants.StagingAlias, tableID.FullyQualifiedName(), constants.TargetAlias, joinOn, idempotentClause,
subQuery, constants.StagingAlias, tableID.FullyQualifiedName(), constants.TargetAlias, joinOn,
// WHERE __artie_only_set_delete = 0
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, md),
),
fmt.Sprintf(`
UPDATE %s SET %s
FROM %s AS %s LEFT JOIN %s AS %s ON %s%s
FROM %s AS %s LEFT JOIN %s AS %s ON %s
WHERE COALESCE(%s, 0) = 1;`,
// UPDATE table SET __artie_delete = stg.__artie_delete
constants.TargetAlias, sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, md),
// FROM staging AS stg LEFT JOIN target AS tgt ON tgt.pk = stg.pk
subQuery, constants.StagingAlias, tableID.FullyQualifiedName(), constants.TargetAlias, joinOn, idempotentClause,
subQuery, constants.StagingAlias, tableID.FullyQualifiedName(), constants.TargetAlias, joinOn,
// WHERE __artie_only_set_delete = 1
sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, md),
),
Expand All @@ -260,12 +250,12 @@ WHERE COALESCE(%s, 0) = 1;`,

return []string{baseQuery + fmt.Sprintf(`
WHEN MATCHED AND %s = 1 THEN DELETE
WHEN MATCHED AND COALESCE(%s, 0) = 0 %sTHEN UPDATE SET %s
WHEN MATCHED AND COALESCE(%s, 0) = 0 THEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// WHEN MATCHED AND %s = 1 THEN DELETE
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md),
// WHEN MATCHED AND COALESCE(%s, 0) = 0 %sTHEN UPDATE SET %s
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
// WHEN MATCHED AND COALESCE(%s, 0) = 0 THEN UPDATE SET %s
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, md),
// WHEN NOT MATCHED AND COALESCE(%s, 1) = 0 THEN INSERT (%s)
sql.QuotedDeleteColumnMarker(constants.StagingAlias, md), strings.Join(sql.QuoteColumns(cols, md), ","),
// VALUES (%s);
Expand Down
23 changes: 0 additions & 23 deletions clients/mssql/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func TestMSSQLDialect_BuildMergeQueries(t *testing.T) {
queries, err := MSSQLDialect{}.BuildMergeQueries(
fakeID,
fqTable,
"",
[]columns.Column{_cols[0]},
[]string{},
_cols,
Expand All @@ -190,35 +189,13 @@ MERGE INTO database.schema.table tgt
USING database.schema.table AS stg ON tgt."id" = stg."id"
WHEN MATCHED AND stg."__artie_delete" = 1 THEN DELETE
WHEN MATCHED AND COALESCE(stg."__artie_delete", 0) = 0 THEN UPDATE SET "id"=stg."id","bar"=stg."bar","updated_at"=stg."updated_at","start"=stg."start"
WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 1) = 0 THEN INSERT ("id","bar","updated_at","start") VALUES (stg."id",stg."bar",stg."updated_at",stg."start");`, queries[0])
}
{
// Idempotent key:
queries, err := MSSQLDialect{}.BuildMergeQueries(
fakeID,
"{SUB_QUERY}",
"idempotent_key",
[]columns.Column{_cols[0]},
[]string{},
_cols,
false,
false,
)
assert.NoError(t, err)
assert.Len(t, queries, 1)
assert.Equal(t, `
MERGE INTO database.schema.table tgt
USING {SUB_QUERY} AS stg ON tgt."id" = stg."id"
WHEN MATCHED AND stg."__artie_delete" = 1 THEN DELETE
WHEN MATCHED AND COALESCE(stg."__artie_delete", 0) = 0 AND stg.idempotent_key >= tgt.idempotent_key THEN UPDATE SET "id"=stg."id","bar"=stg."bar","updated_at"=stg."updated_at","start"=stg."start"
WHEN NOT MATCHED AND COALESCE(stg."__artie_delete", 1) = 0 THEN INSERT ("id","bar","updated_at","start") VALUES (stg."id",stg."bar",stg."updated_at",stg."start");`, queries[0])
}
{
// Soft delete:
queries, err := MSSQLDialect{}.BuildMergeQueries(
fakeID,
"{SUB_QUERY}",
"",
[]columns.Column{_cols[0]},
[]string{},
_cols,
Expand Down
13 changes: 1 addition & 12 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,10 @@ func (rd RedshiftDialect) buildMergeUpdateQueries(
subQuery string,
primaryKeys []columns.Column,
cols []columns.Column,
idempotentKey string,
softDelete bool,
) []string {
clauses := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, rd)

if idempotentKey != "" {
clauses = append(clauses, fmt.Sprintf("%s.%s >= %s.%s", constants.StagingAlias, idempotentKey, constants.TargetAlias, idempotentKey))
}

if !softDelete {
clauses = append(clauses, fmt.Sprintf("COALESCE(%s, false) = false", sql.QuotedDeleteColumnMarker(constants.StagingAlias, rd)))
return []string{fmt.Sprintf(`UPDATE %s AS %s SET %s FROM %s AS %s WHERE %s;`,
Expand Down Expand Up @@ -260,18 +255,12 @@ func (rd RedshiftDialect) buildMergeDeleteQuery(tableID sql.TableIdentifier, sub
func (rd RedshiftDialect) BuildMergeQueries(
tableID sql.TableIdentifier,
subQuery string,
idempotentKey string,
primaryKeys []columns.Column,
_ []string,
cols []columns.Column,
softDelete bool,
containsHardDeletes bool,
) ([]string, error) {
// We should not need idempotency key for DELETE
// This is based on the assumption that the primary key would be atomically increasing or UUID based
// With AI, the sequence will increment (never decrement). And UUID is there to prevent universal hash collision
// However, there may be edge cases where folks end up restoring deleted rows (which will contain the same PK).

cols, err := columns.RemoveOnlySetDeleteColumnMarker(cols)
if err != nil {
return []string{}, err
Expand All @@ -287,7 +276,7 @@ func (rd RedshiftDialect) BuildMergeQueries(
}

parts := []string{rd.buildMergeInsertQuery(tableID, subQuery, primaryKeys, cols)}
parts = append(parts, rd.buildMergeUpdateQueries(tableID, subQuery, primaryKeys, cols, idempotentKey, softDelete)...)
parts = append(parts, rd.buildMergeUpdateQueries(tableID, subQuery, primaryKeys, cols, softDelete)...)

if !softDelete && containsHardDeletes {
parts = append(parts, rd.buildMergeDeleteQuery(tableID, subQuery, primaryKeys))
Expand Down
Loading

0 comments on commit f372cb3

Please sign in to comment.