Skip to content

Commit

Permalink
[dml] Pull out Redshift equality parts building
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 3, 2024
1 parent c539bb9 commit cd50456
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
31 changes: 17 additions & 14 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,18 @@ func (m *MergeArgument) Valid() error {
return nil
}

func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equalitySQLParts []string) string {
func (m *MergeArgument) redshiftEqualitySQLParts() []string {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}
return equalitySQLParts
}

func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column) string {
return fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`,
// insert into target (col1, col2, col3)
m.TableID.FullyQualifiedName(), strings.Join(quoteColumns(columns, m.Dialect), ","),
Expand All @@ -81,7 +92,7 @@ func (m *MergeArgument) buildRedshiftInsertQuery(columns []columns.Column, equal
Prefix: "cc.",
}), m.SubQuery,
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
m.TableID.FullyQualifiedName(), strings.Join(m.redshiftEqualitySQLParts(), " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()),
)
Expand Down Expand Up @@ -126,24 +137,16 @@ func (m *MergeArgument) GetParts() ([]string, error) {
idempotentClause = fmt.Sprintf(" AND cc.%s >= c.%s", m.IdempotentKey, m.IdempotentKey)
}

var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name())
equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}

if m.SoftDelete {
return []string{
// INSERT
m.buildRedshiftInsertQuery(m.Columns, equalitySQLParts),
m.buildRedshiftInsertQuery(m.Columns),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(m.Columns, m.Dialect),
// FROM table (temp) WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause,
m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause,
),
}, nil
}
Expand All @@ -156,13 +159,13 @@ func (m *MergeArgument) GetParts() ([]string, error) {

parts := []string{
// INSERT
m.buildRedshiftInsertQuery(columns, equalitySQLParts),
m.buildRedshiftInsertQuery(columns),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
m.TableID.FullyQualifiedName(), buildColumnsUpdateFragment(columns, m.Dialect),
// FROM staging WHERE join on PK(s)
m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
m.SubQuery, strings.Join(m.redshiftEqualitySQLParts(), " and "), idempotentClause, m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker),
),
}

Expand Down
23 changes: 19 additions & 4 deletions lib/destination/dml/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,22 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) {
assert.Contains(t, mergeSQL, `cc."ID",cc."GROUP",cc."UPDATED_AT",cc."START"`, mergeSQL)
}

func TestBuildRedshiftInsertQuery(t *testing.T) {
func TestMergeArgument_RedshiftEqualitySQLParts(t *testing.T) {
cols := []columns.Column{
columns.NewColumn("col1", typing.Invalid),
columns.NewColumn("col2", typing.Invalid),
}

mergeArg := MergeArgument{
TableID: MockTableIdentifier{"{TABLE_ID}"},
SubQuery: "{SUB_QUERY}",
PrimaryKeys: []columns.Column{cols[0], columns.NewColumn("othercol", typing.Invalid)},
Dialect: sql.SnowflakeDialect{},
}
assert.Equal(t, []string{}, mergeArg.redshiftEqualitySQLParts())
}

func TestMergeArgument_BuildRedshiftInsertQuery(t *testing.T) {
cols := []columns.Column{
columns.NewColumn("col1", typing.Invalid),
columns.NewColumn("col2", typing.Invalid),
Expand All @@ -285,12 +300,12 @@ func TestBuildRedshiftInsertQuery(t *testing.T) {
Dialect: sql.SnowflakeDialect{},
}
assert.Equal(t,
`INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on {EQUALITY_PART_1} and {EQUALITY_PART_2} WHERE c."COL1" IS NULL;`,
mergeArg.buildRedshiftInsertQuery(cols, []string{"{EQUALITY_PART_1}", "{EQUALITY_PART_2}"}),
`INSERT INTO {TABLE_ID} ("COL1","COL2") SELECT cc."COL1",cc."COL2" FROM {SUB_QUERY} as cc LEFT JOIN {TABLE_ID} as c on c."COL1" = cc."COL1" and c."OTHERCOL" = cc."OTHERCOL" WHERE c."COL1" IS NULL;`,
mergeArg.buildRedshiftInsertQuery(cols),
)
}

func TestBuildRedshiftDeleteQuery(t *testing.T) {
func TestMergeArgument_BuildRedshiftDeleteQuery(t *testing.T) {
cols := []columns.Column{
columns.NewColumn("col1", typing.Invalid),
columns.NewColumn("col2", typing.Invalid),
Expand Down

0 comments on commit cd50456

Please sign in to comment.