Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 2, 2024
1 parent 167b126 commit 59aa085
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func (m *MergeArgument) GetParts() ([]string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))
escapedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.RawName())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", escapedPrimaryKey, escapedPrimaryKey)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}

Expand All @@ -115,7 +116,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
m.PrimaryKeys[0].Name(m.Dialect)),
m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].RawName())),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
Expand All @@ -142,7 +143,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {

var pks []string
for _, pk := range m.PrimaryKeys {
pks = append(pks, pk.Name(m.Dialect))
pks = append(pks, m.Dialect.QuoteIdentifier(pk.RawName()))
}

parts := []string{
Expand All @@ -159,7 +160,7 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
m.PrimaryKeys[0].Name(m.Dialect)),
m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].RawName())),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
Expand Down Expand Up @@ -207,15 +208,17 @@ func (m *MergeArgument) GetStatement() (string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))
escapedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.RawName())

equalitySQL := fmt.Sprintf("c.%s = cc.%s", escapedPrimaryKey, escapedPrimaryKey)
pkCol, isOk := m.Columns.GetColumn(primaryKey.RawName())
if !isOk {
return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.RawName(), m.Columns)
}

if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind {
// BigQuery requires special casting to compare two JSON objects.
equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))
equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", escapedPrimaryKey, escapedPrimaryKey)
}

equalitySQLParts = append(equalitySQLParts, equalitySQL)
Expand Down Expand Up @@ -295,7 +298,8 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.Name(m.Dialect), primaryKey.Name(m.Dialect))
escapedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.RawName())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", escapedPrimaryKey, escapedPrimaryKey)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}

Expand Down

0 comments on commit 59aa085

Please sign in to comment.