diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index e3ccbf39f..bce889e45 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -70,7 +70,18 @@ func (m *MergeArgument) Valid() error { return nil } -func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string { +func (m *MergeArgument) redshiftEqualitySQLParts() []string { + var equalitySQLParts []string + for _, primaryKey := range m.PrimaryKeys { + // We'll need to escape the primary key as well. + quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name()) + equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey) + equalitySQLParts = append(equalitySQLParts, equalitySQL) + } + return equalitySQLParts +} + +func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column) string { return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, // insert into target (col1, col2, col3) m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(columns, m.Dialect), ","), @@ -81,7 +92,7 @@ func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equal Prefix: "cc.", }), m.SubQuery, // LEFT JOIN table on pk(s) - m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "), + m.TableID.FullyQualifiedName(), strings.Join(m.redshiftEqualitySQLParts(), " and "), // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()), ) @@ -126,24 +137,16 @@ func (m *MergeArgument) GetParts() ([]string, error) { idempotentClause = fmt.Sprintf(" AND cc.%s >= c.%s", m.IdempotentKey, m.IdempotentKey) } - var equalitySQLParts []string - for _, primaryKey := range m.PrimaryKeys { - // We'll need to escape the primary key as well. - quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name()) - equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey) - equalitySQLParts = append(equalitySQLParts, equalitySQL) - } - if m.SoftDelete { return []string{ // INSERT - m.buildRedshiftInsertQuery(m.Columns, equalitySQLParts), + m.buildRedshiftInsertQuery(m.Columns), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, // UPDATE table set col1 = cc. col1 m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect), // FROM table (temp) WHERE join on PK(s) - m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, + m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause, ), }, nil } @@ -156,13 +159,13 @@ func (m *MergeArgument) GetParts() ([]string, error) { parts := []string{ // INSERT - m.buildRedshiftInsertQuery(columns, equalitySQLParts), + m.buildRedshiftInsertQuery(columns), // UPDATE fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, // UPDATE table set col1 = cc. col1 m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect), // FROM staging WHERE join on PK(s) - m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), + m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker), ), } diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index dc2195693..8cef22f16 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -272,7 +272,22 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) { assert.Contains(t, mergeSQL, `cc."ID",cc."GROUP",cc."UPDATED_AT",cc."START"`, mergeSQL) } -func TestBuildRedshiftInsertQuery(t *testing.T) { +func TestMergeArgument_RedshiftEqualitySQLParts(t *testing.T) { + cols := []columns.Column{ + columns.NewColumn("col1", typing.Invalid), + columns.NewColumn("col2", typing.Invalid), + } + + mergeArg := MergeArgument{ + TableID: MockTableIdentifier{"{TABLE_ID}"}, + SubQuery: "{SUB_QUERY}", + PrimaryKeys: []columns.Column{cols[0], columns.NewColumn("othercol", typing.Invalid)}, + Dialect: sql.SnowflakeDialect{}, + } + assert.Equal(t, []string{}, mergeArg.redshiftEqualitySQLParts()) +} + +func TestMergeArgument_BuildRedshiftInsertQuery(t *testing.T) { cols := []columns.Column{ columns.NewColumn("col1", typing.Invalid), columns.NewColumn("col2", typing.Invalid), @@ -285,12 +300,12 @@ func TestBuildRedshiftInsertQuery(t *testing.T) { Dialect: sql.SnowflakeDialect{}, } assert.Equal(t, - `INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on {EQUALITY_PART_1} and {EQUALITY_PART_2} WHERE c."COL1" IS NULL;`, - mergeArg.buildRedshiftInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}), + `INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on c."COL1" = cc."COL1" and c."OTHERCOL" = cc."OTHERCOL" WHERE c."COL1" IS NULL;`, + mergeArg.buildRedshiftInsertQuery(cols), ) } -func TestBuildRedshiftDeleteQuery(t *testing.T) { +func TestMergeArgument_BuildRedshiftDeleteQuery(t *testing.T) { cols := []columns.Column{ columns.NewColumn("col1", typing.Invalid), columns.NewColumn("col2", typing.Invalid),