Skip to content

Commit

Permalink
[dml] Change MergeArgument.buildDefaultStatement to return a slice
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 13, 2024
1 parent cd9e195 commit 0401f71
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
18 changes: 7 additions & 11 deletions lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (m *MergeArgument) buildRedshiftStatements() ([]string, error) {
return parts, nil
}

func (m *MergeArgument) buildDefaultStatement() (string, error) {
func (m *MergeArgument) buildDefaultStatements() ([]string, error) {
// We should not need idempotency key for DELETE
// This is based on the assumption that the primary key would be atomically increasing or UUID based
// With AI, the sequence will increment (never decrement). And UUID is there to prevent universal hash collision
Expand Down Expand Up @@ -203,7 +203,7 @@ func (m *MergeArgument) buildDefaultStatement() (string, error) {
}

if m.SoftDelete {
return fmt.Sprintf(`
return []string{fmt.Sprintf(`
MERGE INTO %s c USING %s AS cc ON %s
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`,
Expand All @@ -216,16 +216,16 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
Vals: columns.QuoteColumns(m.Columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
})), nil
}))}, nil
}

// We also need to remove __artie flags since it does not exist in the destination table
cols, removed := columns.RemoveDeleteColumnMarker(m.Columns)
if !removed {
return "", errors.New("artie delete flag doesn't exist")
return []string{}, errors.New("artie delete flag doesn't exist")
}

return fmt.Sprintf(`
return []string{fmt.Sprintf(`
MERGE INTO %s c USING %s AS cc ON %s
WHEN MATCHED AND cc.%s THEN DELETE
WHEN MATCHED AND IFNULL(cc.%s, false) = false %sTHEN UPDATE SET %s
Expand All @@ -241,7 +241,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
Vals: columns.QuoteColumns(cols, m.Dialect),
Separator: ",",
Prefix: "cc.",
})), nil
}))}, nil
}

func (m *MergeArgument) BuildStatements() ([]string, error) {
Expand All @@ -264,10 +264,6 @@ func (m *MergeArgument) BuildStatements() ([]string, error) {
m.ContainsHardDeletes,
)
default:
mergeQuery, err := m.buildDefaultStatement()
if err != nil {
return nil, err
}
return []string{mergeQuery}, nil
return m.buildDefaultStatements()
}
}
18 changes: 10 additions & 8 deletions lib/destination/dml/merge_bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func TestMergeStatement_TempTable(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.NoError(t, err)

assert.Contains(t, mergeSQL, "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON c.`order_id` = cc.`order_id`", mergeSQL)
assert.Len(t, statements, 1)
assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON c.`order_id` = cc.`order_id`")
}

func TestMergeStatement_JSONKey(t *testing.T) {
Expand All @@ -54,9 +54,10 @@ func TestMergeStatement_JSONKey(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Contains(t, mergeSQL, "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON TO_JSON_STRING(c.`order_oid`) = TO_JSON_STRING(cc.`order_oid`)", mergeSQL)
assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON TO_JSON_STRING(c.`order_oid`) = TO_JSON_STRING(cc.`order_oid`)")
}

func TestMergeArgument_BuildStatements_BigQuery(t *testing.T) {
Expand All @@ -75,9 +76,10 @@ func TestMergeArgument_BuildStatements_BigQuery(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements1, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements1, 1)
assert.NoError(t, err)
statements, err := mergeArg.BuildStatements()
statements2, err := mergeArg.BuildStatements()
assert.NoError(t, err)
assert.Equal(t, []string{mergeSQL}, statements)
assert.Equal(t, statements1, statements2)
}
20 changes: 15 additions & 5 deletions lib/destination/dml/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func TestMergeStatementSoftDelete(t *testing.T) {
SoftDelete: true,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements, 1)
mergeSQL := statements[0]
assert.NoError(t, err)
assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL)
// Soft deletion flag being passed.
Expand Down Expand Up @@ -104,7 +106,9 @@ func TestMergeStatement(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements, 1)
mergeSQL := statements[0]
assert.NoError(t, err)
assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL)
assert.NotContains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", `"UPDATED_AT"`, `"UPDATED_AT"`), fmt.Sprintf("Idempotency key: %s", mergeSQL))
Expand Down Expand Up @@ -153,7 +157,9 @@ func TestMergeStatementIdempotentKey(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements, 1)
mergeSQL := statements[0]
assert.NoError(t, err)
assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL)
assert.Contains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"), fmt.Sprintf("Idempotency key: %s", mergeSQL))
Expand Down Expand Up @@ -199,7 +205,9 @@ func TestMergeStatementCompositeKey(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements, 1)
mergeSQL := statements[0]
assert.NoError(t, err)
assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL)
assert.Contains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"), fmt.Sprintf("Idempotency key: %s", mergeSQL))
Expand Down Expand Up @@ -249,7 +257,9 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) {
SoftDelete: false,
}

mergeSQL, err := mergeArg.buildDefaultStatement()
statements, err := mergeArg.buildDefaultStatements()
assert.Len(t, statements, 1)
mergeSQL := statements[0]
assert.NoError(t, err)
assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL)
assert.NotContains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", `"UPDATED_AT"`, `"UPDATED_AT"`), fmt.Sprintf("Idempotency key: %s", mergeSQL))
Expand Down

0 comments on commit 0401f71

Please sign in to comment.