Skip to content

Commit

Permalink
Pull out queries
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 3, 2024
1 parent 5a33938 commit ca6492b
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 23 deletions.
57 changes: 34 additions & 23 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,38 @@ func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column) strin
)
}

func (m *MergeArgument) buildRedshiftSoftDeleteUpdateQuery(columns []columns.Column) string {
// We also need to do staged table's idempotency key is GTE target table's idempotency key
// This is because Snowflake does not respect NS granularity.
var idempotentClause string
if m.IdempotentKey != "" {
idempotentClause = fmt.Sprintf(" AND cc.%s >= c.%s", m.IdempotentKey, m.IdempotentKey)
}

return fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect),
// FROM table (temp) WHERE join on PK(s)
m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause,
)
}

func (m *MergeArgument) buildRedshiftUpdateQuery(columns []columns.Column) string {
// We also need to do staged table's idempotency key is GTE target table's idempotency key
// This is because Snowflake does not respect NS granularity.
var idempotentClause string
if m.IdempotentKey != "" {
idempotentClause = fmt.Sprintf(" AND cc.%s >= c.%s", m.IdempotentKey, m.IdempotentKey)
}

return fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
)
}

func (m *MergeArgument) buildRedshiftDeleteQuery() string {
return fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s as cc WHERE cc.%s = true);`,
// DELETE from table where (pk_1, pk_2)
Expand Down Expand Up @@ -130,24 +162,10 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// With AI, the sequence will increment (never decrement). And UUID is there to prevent universal hash collision
// However, there may be edge cases where folks end up restoring deleted rows (which will contain the same PK).

// We also need to do staged table's idempotency key is GTE target table's idempotency key
// This is because Snowflake does not respect NS granularity.
var idempotentClause string
if m.IdempotentKey != "" {
idempotentClause = fmt.Sprintf(" AND cc.%s >= c.%s", m.IdempotentKey, m.IdempotentKey)
}

if m.SoftDelete {
return []string{
// INSERT
m.buildRedshiftInsertQuery(m.Columns),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect),
// FROM table (temp) WHERE join on PK(s)
m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause,
),
m.buildRedshiftSoftDeleteUpdateQuery(m.Columns),
}, nil
}

Expand All @@ -158,15 +176,8 @@ func (m *MergeArgument) GetParts() ([]string, error) {
}

parts := []string{
// INSERT
m.buildRedshiftInsertQuery(columns),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
),
m.buildRedshiftUpdateQuery(columns),
}

if *m.ContainsHardDeletes {
Expand Down
38 changes: 38 additions & 0 deletions lib/destination/dml/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,44 @@ func TestMergeArgument_BuildRedshiftInsertQuery(t *testing.T) {
)
}

func TestMergeArgument_BuildRedshiftSoftDeleteUpdateQuery(t *testing.T) {
cols := []columns.Column{
columns.NewColumn("col1", typing.Invalid),
columns.NewColumn("col2", typing.Invalid),
columns.NewColumn("col3", typing.Invalid),
}

mergeArg := MergeArgument{
TableID: MockTableIdentifier{"{TABLE_ID}"},
SubQuery: "{SUB_QUERY}",
PrimaryKeys: []columns.Column{cols[0], cols[2]},
Dialect: sql.SnowflakeDialect{},
}
assert.Equal(t,
`UPDATE {TABLE_ID} as c SET "COL1"=cc."COL1","COL2"=cc."COL2","COL3"=cc."COL3" FROM {SUB_QUERY} as cc WHERE c."COL1" = cc."COL1" and c."COL3" = cc."COL3";`,
mergeArg.buildRedshiftSoftDeleteUpdateQuery(cols),
)
}

func TestMergeArgument_BuildRedshiftUpdateQuery(t *testing.T) {
cols := []columns.Column{
columns.NewColumn("col1", typing.Invalid),
columns.NewColumn("col2", typing.Invalid),
columns.NewColumn("col3", typing.Invalid),
}

mergeArg := MergeArgument{
TableID: MockTableIdentifier{"{TABLE_ID}"},
SubQuery: "{SUB_QUERY}",
PrimaryKeys: []columns.Column{cols[0], cols[2]},
Dialect: sql.SnowflakeDialect{},
}
assert.Equal(t,
`UPDATE {TABLE_ID} as c SET "COL1"=cc."COL1","COL2"=cc."COL2","COL3"=cc."COL3" FROM {SUB_QUERY} as cc WHERE c."COL1" = cc."COL1" and c."COL3" = cc."COL3" AND COALESCE(cc."__ARTIE_DELETE", false) = false;`,
mergeArg.buildRedshiftUpdateQuery(cols),
)
}

func TestMergeArgument_BuildRedshiftDeleteQuery(t *testing.T) {
cols := []columns.Column{
columns.NewColumn("col1", typing.Invalid),
Expand Down

0 comments on commit ca6492b

Please sign in to comment.