Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 30, 2024
1 parent 82f61c6 commit 8f7687b
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/typing/columns"

"cloud.google.com/go/bigquery"
_ "github.com/viant/bigquery"

Expand Down Expand Up @@ -150,7 +152,8 @@ func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, row
func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
var primaryKeysEscaped []string
for _, pk := range primaryKeys {
primaryKeysEscaped = append(primaryKeysEscaped, pk)
pkCol := columns.NewColumn(pk, typing.Invalid)
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label()))
}

orderColsToIterate := primaryKeysEscaped
Expand All @@ -164,24 +167,28 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif
}

var parts []string
parts = append(parts, fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s") AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`,
stagingTableID.FullyQualifiedName(),
typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
tableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(orderByCols, ", "),
))
parts = append(parts,
fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s") AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`,
stagingTableID.FullyQualifiedName(),
typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
tableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(orderByCols, ", "),
),
)

var whereClauses []string
for _, primaryKeyEscaped := range primaryKeysEscaped {
whereClauses = append(whereClauses, fmt.Sprintf("t1.%s = t2.%s", primaryKeyEscaped, primaryKeyEscaped))
}

parts = append(parts, fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s",
tableID.FullyQualifiedName(),
stagingTableID.FullyQualifiedName(),
strings.Join(whereClauses, " AND "),
))
parts = append(parts,
fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s",
tableID.FullyQualifiedName(),
stagingTableID.FullyQualifiedName(),
strings.Join(whereClauses, " AND "),
),
)

parts = append(parts, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName()))
return parts
Expand Down

0 comments on commit 8f7687b

Please sign in to comment.