diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index 271ee33e1..d494e1f7e 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -77,7 +77,7 @@ func removeDeleteColumnMarker(cols []columns.Column) ([]columns.Column, bool) { return cols, len(cols) != origLength } -func (m *MergeArgument) buildInsertQuery(columns []columns.Column, equalitySQLParts []string) string { +func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string { return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(columns, m.Dialect), ","), @@ -131,7 +131,7 @@ func (m *MergeArgument) GetParts() ([]string, error) { if m.SoftDelete { return []string{ // INSERT - m.buildInsertQuery(m.Columns, equalitySQLParts), + m.buildRedshiftInsertQuery(m.Columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 @@ -148,14 +148,9 @@ func (m *MergeArgument) GetParts() ([]string, error) { return nil, errors.New("artie delete flag doesn't exist") } - var pks []string - for _, pk := range m.PrimaryKeys { - pks = append(pks, m.Dialect.QuoteIdentifier(pk.Name())) - } - parts := []string{ // INSERT - m.buildInsertQuery(columns, equalitySQLParts), + m.buildRedshiftInsertQuery(columns, equalitySQLParts), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 @@ -170,10 +165,10 @@ func (m *MergeArgument) GetParts() ([]string, error) { // DELETE fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s as cc WHERE cc.%s = true);`, // DELETE from table where (pk_1, pk_2) - m.TableID.FullyQualifiedName(), strings.Join(pks, ","), + m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(m.PrimaryKeys, m.Dialect), ","), // IN (cc.pk_1, cc.pk_2) FROM staging array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ - Vals: pks, + Vals: quoteColumns(m.PrimaryKeys, m.Dialect), Separator: ",", Prefix: "cc.", }), m.SubQuery, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 3836e7282..04453597b 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -324,6 +324,6 @@ func TestBuildInsertQuery(t *testing.T) { } assert.Equal(t, `INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on {EQUALITY_PART_1} and {EQUALITY_PART_2} WHERE c."COL1" IS NULL;`, - mergeArg.buildInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), + mergeArg.buildRedshiftInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), ) }