Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring BigQuery Dedupe #516

Merged
merged 7 commits into from
Apr 30, 2024
Merged

Refactoring BigQuery Dedupe #516

merged 7 commits into from
Apr 30, 2024

Conversation

Tang8330
Copy link
Contributor

@Tang8330 Tang8330 commented Apr 30, 2024

Changes

Improving the way we handle deduping in BigQuery.

This way, we'll create a staging table that has all the duplicate rows.

  • It stores the latest row in the staging table
  • We then delete all the duplicate rows from the target table
  • We then copy the data from staging -> target
  • Drop staging table

@Tang8330 Tang8330 marked this pull request as ready for review April 30, 2024 18:05
@Tang8330 Tang8330 requested a review from nathan-artie April 30, 2024 18:06
var primaryKeysEscaped []string
for _, pk := range primaryKeys {
pkCol := columns.NewColumn(pk, typing.Invalid)
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do sql.EscapeName(pk, false, s.Label()) since we always want escape BigQuery names.


orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, constants.UpdateColumnMarker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we probably want to escape constants.UpdateColumnMarker too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can escape it, but we don't need to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have it call sql.EscapeNameIfNecessary so that it'll get escaped once we address escaping Artie cols?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that works!

}
}

_ = ddl.DropTemporaryTable(s, stagingTableID.FullyQualifiedName(), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we only need to do this if the transaction was committed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transaction doesn't drop the table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the SQL text doesn't explicitly call this a temporary table. Temp tables in BQ behaves differently from Snowflake and Redshift.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're regular tables that can have a TTL. This query drops the table so we don't need to wait for GC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that was my guess (that the transaction doesn't drop the table).

@@ -131,13 +130,12 @@ func (s *Store) reestablishConnection() error {
func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
var primaryKeysEscaped []string
for _, pk := range primaryKeys {
pkCol := columns.NewColumn(pk, typing.Invalid)
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label()))
primaryKeysEscaped = append(primaryKeysEscaped, sql.EscapeNameIfNecessary(pk, s.ShouldUppercaseEscapedNames(), s.Label()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nathan-artie Heads up - I escaped this too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@Tang8330 Tang8330 merged commit 3d29172 into master Apr 30, 2024
1 check passed
@Tang8330 Tang8330 deleted the bigquery-dedupe branch April 30, 2024 20:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants