Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 30, 2024
1 parent 8f7687b commit 3975278
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
5 changes: 3 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif

var parts []string
parts = append(parts,
fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s") AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`,
fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s")) AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`,
stagingTableID.FullyQualifiedName(),
typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
tableID.FullyQualifiedName(),
Expand All @@ -182,8 +182,9 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif
whereClauses = append(whereClauses, fmt.Sprintf("t1.%s = t2.%s", primaryKeyEscaped, primaryKeyEscaped))
}

// https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#delete_with_subquery
parts = append(parts,
fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s",
fmt.Sprintf("DELETE FROM %s t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE %s)",
tableID.FullyQualifiedName(),
stagingTableID.FullyQualifiedName(),
strings.Join(whereClauses, " AND "),
Expand Down
22 changes: 11 additions & 11 deletions clients/bigquery/bigquery_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/stretchr/testify/assert"
"github.com/artie-labs/transfer/lib/typing"
)

func (b *BigQueryTestSuite) TestGenerateDedupeQueries() {
Expand All @@ -24,13 +24,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() {
assert.Len(b.T(), parts, 3)
assert.Equal(
b.T(),
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY id ASC) = 2)",
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 USING %s t2 WHERE t1.id = t2.id", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`id` = t2.`id`)", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project12`.`public`.`customers` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2])
}
{
Expand All @@ -42,13 +42,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() {
assert.Len(b.T(), parts, 3)
assert.Equal(
b.T(),
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY id ASC, __artie_updated_at ASC) = 2)",
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC, __artie_updated_at ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 USING %s t2 WHERE t1.id = t2.id", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`id` = t2.`id`)", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project12`.`public`.`customers` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2])
}
{
Expand All @@ -60,13 +60,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() {
assert.Len(b.T(), parts, 3)
assert.Equal(
b.T(),
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, settings ORDER BY user_id ASC, settings ASC) = 2)",
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 USING %s t2 WHERE t1.user_id = t2.user_id AND t1.settings = t2.settings", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`user_id` = t2.`user_id` AND t1.`settings` = t2.`settings`)", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project123`.`public`.`user_settings` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2])
}
{
Expand All @@ -78,13 +78,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() {
assert.Len(b.T(), parts, 3)
assert.Equal(
b.T(),
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, settings ORDER BY user_id ASC, settings ASC, __artie_updated_at ASC) = 2)",
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC, __artie_updated_at ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 USING %s t2 WHERE t1.user_id = t2.user_id AND t1.settings = t2.settings", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`user_id` = t2.`user_id` AND t1.`settings` = t2.`settings`)", stagingTableID.FullyQualifiedName()), parts[1])
assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project123`.`public`.`user_settings` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2])
}
}

0 comments on commit 3975278

Please sign in to comment.