Skip to content

Commit

Permalink
BigQuery <> MongoDB equality fix (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Apr 7, 2023
1 parent a547afa commit 54478c9
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 10 deletions.
9 changes: 8 additions & 1 deletion clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,15 @@ func merge(tableData *optimization.TableData) (string, error) {
}

subQuery := strings.Join(rowValues, " UNION ALL ")

var specialCastForPrimaryKey bool
pkType, isOk := tableData.InMemoryColumns[tableData.PrimaryKey]
if isOk {
specialCastForPrimaryKey = pkType.Kind == typing.Struct.Kind
}

return dml.MergeStatement(tableData.ToFqName(constants.BigQuery), subQuery,
tableData.PrimaryKey, tableData.IdempotentKey, cols, tableData.SoftDelete)
tableData.PrimaryKey, tableData.IdempotentKey, cols, tableData.SoftDelete, specialCastForPrimaryKey)
}

func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
Expand Down
58 changes: 58 additions & 0 deletions clients/bigquery/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,64 @@ func (b *BigQueryTestSuite) TestMerge() {
assert.NoError(b.T(), err, "merge failed")
// Check if MERGE INTO FQ Table exists.
assert.True(b.T(), strings.Contains(mergeSQL, "MERGE INTO shop.customer c"), mergeSQL)
// Check for equality merge
assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("c.%s = cc.%s", tableData.PrimaryKey, tableData.PrimaryKey)))
for _, rowData := range tableData.RowsData {
for col, val := range rowData {
switch cols[col] {
case typing.String, typing.Array, typing.Struct:
val = fmt.Sprintf("'%v'", val)
}

assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprint(val)), map[string]interface{}{
"merge": mergeSQL,
"val": val,
})
}
}
}

func (b *BigQueryTestSuite) TestMergeJSONKey() {
cols := map[string]typing.KindDetails{
"id": typing.Struct,
"name": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
}

rowData := make(map[string]map[string]interface{})
for idx, name := range []string{"robin", "jacqueline", "dusty"} {
pkVal := fmt.Sprint(map[string]interface{}{
"$oid": fmt.Sprintf("640127e4beeb1ccfc821c25c++%v", idx),
})

rowData[pkVal] = map[string]interface{}{
"id": pkVal,
"name": name,
constants.DeleteColumnMarker: false,
}
}

topicConfig := kafkalib.TopicConfig{
Database: "shop",
TableName: "customer",
Schema: "public",
}

tableData := &optimization.TableData{
InMemoryColumns: cols,
RowsData: rowData,
PrimaryKey: "id",
TopicConfig: topicConfig,
LatestCDCTs: time.Time{},
}

mergeSQL, err := merge(tableData)

assert.NoError(b.T(), err, "merge failed")
// Check if MERGE INTO FQ Table exists.
assert.True(b.T(), strings.Contains(mergeSQL, "MERGE INTO shop.customer c"), mergeSQL)
// Check for equality merge
assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", tableData.PrimaryKey, tableData.PrimaryKey)))
for _, rowData := range tableData.RowsData {
for col, val := range rowData {
switch cols[col] {
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {
strings.Join(tableValues, ","), tableData.TopicConfig.TableName, strings.Join(cols, ","))

return dml.MergeStatement(tableData.ToFqName(constants.Snowflake), subQuery,
tableData.PrimaryKey, tableData.IdempotentKey, cols, tableData.SoftDelete)
tableData.PrimaryKey, tableData.IdempotentKey, cols, tableData.SoftDelete, false)
}
1 change: 1 addition & 0 deletions clients/snowflake/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (s *SnowflakeTestSuite) TestMerge() {

// Check if MERGE INTO FQ Table exists.
assert.True(s.T(), strings.Contains(mergeSQL, "MERGE INTO shop.public.customer c"))
assert.True(s.T(), strings.Contains(mergeSQL, fmt.Sprintf("c.%s = cc.%s", tableData.PrimaryKey, tableData.PrimaryKey)))
for _, rowData := range tableData.RowsData {
for col, val := range rowData {
switch cols[col] {
Expand Down
16 changes: 11 additions & 5 deletions lib/dwh/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
)

func MergeStatement(fqTableName, subQuery, pk, idempotentKey string, cols []string, softDelete bool) (string, error) {
func MergeStatement(fqTableName, subQuery, pk, idempotentKey string, cols []string, softDelete bool, specialStructCastMergeKey bool) (string, error) {
// We should not need idempotency key for DELETE
// This is based on the assumption that the primary key would be atomically increasing or UUID based
// With AI, the sequence will increment (never decrement). And UUID is there to prevent universal hash collision
Expand All @@ -22,9 +22,15 @@ func MergeStatement(fqTableName, subQuery, pk, idempotentKey string, cols []stri
idempotentClause = fmt.Sprintf("AND cc.%s >= c.%s ", idempotentKey, idempotentKey)
}

equalitySQL := fmt.Sprintf("c.%s = cc.%s", pk, pk)
if specialStructCastMergeKey {
// BigQuery requires special casting to compare two JSON objects.
equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", pk, pk)
}

if softDelete {
return fmt.Sprintf(`
MERGE INTO %s c using (%s) as cc on c.%s = cc.%s
MERGE INTO %s c using (%s) as cc on %s
when matched %sthen UPDATE
SET %s
when not matched AND IFNULL(cc.%s, false) = false then INSERT
Expand All @@ -35,7 +41,7 @@ func MergeStatement(fqTableName, subQuery, pk, idempotentKey string, cols []stri
(
%s
);
`, fqTableName, subQuery, pk, pk,
`, fqTableName, subQuery, equalitySQL,
// Update + Soft Deletion
idempotentClause, array.ColumnsUpdateQuery(cols, "cc"),
// Insert
Expand All @@ -58,7 +64,7 @@ func MergeStatement(fqTableName, subQuery, pk, idempotentKey string, cols []stri
}

return fmt.Sprintf(`
MERGE INTO %s c using (%s) as cc on c.%s = cc.%s
MERGE INTO %s c using (%s) as cc on %s
when matched AND cc.%s then DELETE
when matched AND IFNULL(cc.%s, false) = false %sthen UPDATE
SET %s
Expand All @@ -70,7 +76,7 @@ func MergeStatement(fqTableName, subQuery, pk, idempotentKey string, cols []stri
(
%s
);
`, fqTableName, subQuery, pk, pk,
`, fqTableName, subQuery, equalitySQL,
// Delete
constants.DeleteColumnMarker,
// Update
Expand Down
6 changes: 3 additions & 3 deletions lib/dwh/dml/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestMergeStatementSoftDelete(t *testing.T) {
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

for _, idempotentKey := range []string{"", "updated_at"} {
mergeSQL, err := MergeStatement(fqTable, subQuery, "id", idempotentKey, cols, true)
mergeSQL, err := MergeStatement(fqTable, subQuery, "id", idempotentKey, cols, true, false)
assert.NoError(t, err)
assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL)
// Soft deletion flag being passed.
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestMergeStatement(t *testing.T) {
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

mergeSQL, err := MergeStatement(fqTable, subQuery, "id", "", cols, false)
mergeSQL, err := MergeStatement(fqTable, subQuery, "id", "", cols, false, false)
assert.NoError(t, err)
assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL)
assert.False(t, strings.Contains(mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at")), fmt.Sprintf("Idempotency key: %s", mergeSQL))
Expand All @@ -86,7 +86,7 @@ func TestMergeStatementIdempotentKey(t *testing.T) {
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))

mergeSQL, err := MergeStatement(fqTable, subQuery, "id", "updated_at", cols, false)
mergeSQL, err := MergeStatement(fqTable, subQuery, "id", "updated_at", cols, false, false)
assert.NoError(t, err)
assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL)
assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at")), fmt.Sprintf("Idempotency key: %s", mergeSQL))
Expand Down

0 comments on commit 54478c9

Please sign in to comment.