-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring BigQuery Dedupe #516
Conversation
clients/bigquery/bigquery.go
Outdated
var primaryKeysEscaped []string | ||
for _, pk := range primaryKeys { | ||
pkCol := columns.NewColumn(pk, typing.Invalid) | ||
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could do sql.EscapeName(pk, false, s.Label())
since we always want escape BigQuery names.
clients/bigquery/bigquery.go
Outdated
|
||
orderColsToIterate := primaryKeysEscaped | ||
if topicConfig.IncludeArtieUpdatedAt { | ||
orderColsToIterate = append(orderColsToIterate, constants.UpdateColumnMarker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we probably want to escape constants.UpdateColumnMarker
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can escape it, but we don't need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have it call sql.EscapeNameIfNecessary
so that it'll get escaped once we address escaping Artie cols?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that works!
} | ||
} | ||
|
||
_ = ddl.DropTemporaryTable(s, stagingTableID.FullyQualifiedName(), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we only need to do this if the transaction was committed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transaction doesn't drop the table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note the SQL text doesn't explicitly call this a temporary table. Temp tables in BQ behaves differently from Snowflake and Redshift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're regular tables that can have a TTL. This query drops the table so we don't need to wait for GC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that was my guess (that the transaction doesn't drop the table).
@@ -131,13 +130,12 @@ func (s *Store) reestablishConnection() error { | |||
func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { | |||
var primaryKeysEscaped []string | |||
for _, pk := range primaryKeys { | |||
pkCol := columns.NewColumn(pk, typing.Invalid) | |||
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label())) | |||
primaryKeysEscaped = append(primaryKeysEscaped, sql.EscapeNameIfNecessary(pk, s.ShouldUppercaseEscapedNames(), s.Label())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nathan-artie Heads up - I escaped this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Changes
Improving the way we handle deduping in BigQuery.
This way, we'll create a staging table that has all the duplicate rows.
latest
row in the staging table