From 8f7687b56919ff4be7574e2cfb8a82995cf24f54 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 30 Apr 2024 10:35:45 -0700 Subject: [PATCH] Clean up. --- clients/bigquery/bigquery.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 5b5e263cb..8913773dc 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/artie-labs/transfer/lib/typing/columns" + "cloud.google.com/go/bigquery" _ "github.com/viant/bigquery" @@ -150,7 +152,8 @@ func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, row func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { var primaryKeysEscaped []string for _, pk := range primaryKeys { - primaryKeysEscaped = append(primaryKeysEscaped, pk) + pkCol := columns.NewColumn(pk, typing.Invalid) + primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label())) } orderColsToIterate := primaryKeysEscaped @@ -164,24 +167,28 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif } var parts []string - parts = append(parts, fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s") AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`, - stagingTableID.FullyQualifiedName(), - typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)), - tableID.FullyQualifiedName(), - strings.Join(primaryKeysEscaped, ", "), - strings.Join(orderByCols, ", "), - )) + parts = append(parts, + fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s") AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`, + stagingTableID.FullyQualifiedName(), + typing.ExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)), + tableID.FullyQualifiedName(), + strings.Join(primaryKeysEscaped, ", "), + strings.Join(orderByCols, ", "), + ), + ) var whereClauses []string for _, primaryKeyEscaped := range primaryKeysEscaped { whereClauses = append(whereClauses, fmt.Sprintf("t1.%s = t2.%s", primaryKeyEscaped, primaryKeyEscaped)) } - parts = append(parts, fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s", - tableID.FullyQualifiedName(), - stagingTableID.FullyQualifiedName(), - strings.Join(whereClauses, " AND "), - )) + parts = append(parts, + fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s", + tableID.FullyQualifiedName(), + stagingTableID.FullyQualifiedName(), + strings.Join(whereClauses, " AND "), + ), + ) parts = append(parts, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName())) return parts