Skip to content

Commit

Permalink
Quote constants.DeleteColumnMarker in merge queries
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 2, 2024
1 parent a152157 commit 8d08055
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
20 changes: 10 additions & 10 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), m.Columns.UpdateQuery(m.Dialect, true),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, constants.DeleteColumnMarker,
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
),
}

Expand All @@ -183,7 +183,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
Vals: pks,
Separator: ",",
Prefix: "cc.",
}), m.SubQuery, constants.DeleteColumnMarker,
}), m.SubQuery, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
))
}

Expand Down Expand Up @@ -246,7 +246,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Update + Soft Deletion
idempotentClause, m.Columns.UpdateQuery(m.Dialect, false),
// Insert
constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Expand All @@ -268,11 +268,11 @@ WHEN MATCHED AND IFNULL(cc.%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), subQuery, strings.Join(equalitySQLParts, " and "),
// Delete
constants.DeleteColumnMarker,
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
// Insert
constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Expand Down Expand Up @@ -310,7 +310,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
// Update + Soft Deletion
idempotentClause, m.Columns.UpdateQuery(m.Dialect, false),
// Insert
constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Expand All @@ -333,11 +333,11 @@ WHEN MATCHED AND COALESCE(cc.%s, 0) = 0 %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
m.TableID.FullyQualifiedName(), m.SubQuery, strings.Join(equalitySQLParts, " and "),
// Delete
constants.DeleteColumnMarker,
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
// Update
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
// Insert
constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Expand Down
18 changes: 9 additions & 9 deletions lib/destination/dml/merge_parts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" AND COALESCE(cc.__artie_delete, false) = false;`,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])
}

Expand Down Expand Up @@ -196,11 +196,11 @@ func TestMergeStatementParts(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" AND COALESCE(cc.__artie_delete, false) = false;`,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`,
`DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp as cc WHERE cc."__artie_delete" = true);`,
parts[2])

mergeArg = &MergeArgument{
Expand All @@ -223,11 +223,11 @@ func TestMergeStatementParts(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at AND COALESCE(cc.__artie_delete, false) = false;`,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`,
`DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp as cc WHERE cc."__artie_delete" = true);`,
parts[2])
}

Expand All @@ -254,11 +254,11 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" and c."email" = cc."email" AND COALESCE(cc.__artie_delete, false) = false;`,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" and c."email" = cc."email" AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`,
`DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp as cc WHERE cc."__artie_delete" = true);`,
parts[2])

mergeArg = &MergeArgument{
Expand All @@ -281,10 +281,10 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" and c."email" = cc."email" AND cc.created_at >= c.created_at AND COALESCE(cc.__artie_delete, false) = false;`,
`UPDATE public.tableName as c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp as cc WHERE c."id" = cc."id" and c."email" = cc."email" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`,
`DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp as cc WHERE cc."__artie_delete" = true);`,
parts[2])
}

0 comments on commit 8d08055

Please sign in to comment.