diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 8913773dc..386701bcd 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -168,7 +168,7 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif var parts []string parts = append(parts, - fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s") AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`, + fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s")) AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`, stagingTableID.FullyQualifiedName(), typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)), tableID.FullyQualifiedName(), @@ -182,8 +182,9 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif whereClauses = append(whereClauses, fmt.Sprintf("t1.%s = t2.%s", primaryKeyEscaped, primaryKeyEscaped)) } + // https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#delete_with_subquery parts = append(parts, - fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s", + fmt.Sprintf("DELETE FROM %s t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE %s)", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName(), strings.Join(whereClauses, " AND "), diff --git a/clients/bigquery/bigquery_dedupe_test.go b/clients/bigquery/bigquery_dedupe_test.go index 3f905cbc7..ca7d898bd 100644 --- a/clients/bigquery/bigquery_dedupe_test.go +++ b/clients/bigquery/bigquery_dedupe_test.go @@ -5,13 +5,13 @@ import ( "strings" "time" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" + "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/clients/shared" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" - "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/typing" ) func (b *BigQueryTestSuite) TestGenerateDedupeQueries() { @@ -24,13 +24,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() { assert.Len(b.T(), parts, 3) assert.Equal( b.T(), - fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY id ASC) = 2)", + fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC) = 2)", stagingTableID.FullyQualifiedName(), fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) - assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 USING %s t2 WHERE t1.id = t2.id", stagingTableID.FullyQualifiedName()), parts[1]) + assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`id` = t2.`id`)", stagingTableID.FullyQualifiedName()), parts[1]) assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project12`.`public`.`customers` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2]) } { @@ -42,13 +42,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() { assert.Len(b.T(), parts, 3) assert.Equal( b.T(), - fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY id ASC, __artie_updated_at ASC) = 2)", + fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC, __artie_updated_at ASC) = 2)", stagingTableID.FullyQualifiedName(), fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) - assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 USING %s t2 WHERE t1.id = t2.id", stagingTableID.FullyQualifiedName()), parts[1]) + assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project12`.`public`.`customers` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`id` = t2.`id`)", stagingTableID.FullyQualifiedName()), parts[1]) assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project12`.`public`.`customers` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2]) } { @@ -60,13 +60,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() { assert.Len(b.T(), parts, 3) assert.Equal( b.T(), - fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, settings ORDER BY user_id ASC, settings ASC) = 2)", + fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC) = 2)", stagingTableID.FullyQualifiedName(), fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) - assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 USING %s t2 WHERE t1.user_id = t2.user_id AND t1.settings = t2.settings", stagingTableID.FullyQualifiedName()), parts[1]) + assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`user_id` = t2.`user_id` AND t1.`settings` = t2.`settings`)", stagingTableID.FullyQualifiedName()), parts[1]) assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project123`.`public`.`user_settings` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2]) } { @@ -78,13 +78,13 @@ func (b *BigQueryTestSuite) TestGenerateDedupeQueries() { assert.Len(b.T(), parts, 3) assert.Equal( b.T(), - fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id, settings ORDER BY user_id ASC, settings ASC, __artie_updated_at ASC) = 2)", + fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC, __artie_updated_at ASC) = 2)", stagingTableID.FullyQualifiedName(), fmt.Sprintf(`"%s"`, typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) - assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 USING %s t2 WHERE t1.user_id = t2.user_id AND t1.settings = t2.settings", stagingTableID.FullyQualifiedName()), parts[1]) + assert.Equal(b.T(), fmt.Sprintf("DELETE FROM `project123`.`public`.`user_settings` t1 WHERE EXISTS (SELECT * FROM %s t2 WHERE t1.`user_id` = t2.`user_id` AND t1.`settings` = t2.`settings`)", stagingTableID.FullyQualifiedName()), parts[1]) assert.Equal(b.T(), fmt.Sprintf("INSERT INTO `project123`.`public`.`user_settings` SELECT * FROM %s", stagingTableID.FullyQualifiedName()), parts[2]) } }