Skip to content

Commit

Permalink
[bigquery] Move BigQueryDialect to client/bigquery
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 10, 2024
1 parent 8c7cc5d commit 8b63f5d
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 66 deletions.
11 changes: 6 additions & 5 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"cloud.google.com/go/bigquery"
_ "github.com/viant/bigquery"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
Expand Down Expand Up @@ -115,7 +116,7 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap {
}

func (s *Store) Dialect() sql.Dialect {
return sql.BigQueryDialect{}
return dialect.BigQueryDialect{}
}

func (s *Store) AdditionalDateFormats() []string {
Expand Down Expand Up @@ -151,12 +152,12 @@ func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, row
return nil
}

func generateDedupeQueries(dialect sql.Dialect, tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, dialect)
func generateDedupeQueries(_dialect sql.Dialect, tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, _dialect)

orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, dialect.QuoteIdentifier(constants.UpdateColumnMarker))
orderColsToIterate = append(orderColsToIterate, _dialect.QuoteIdentifier(constants.UpdateColumnMarker))
}

var orderByCols []string
Expand All @@ -168,7 +169,7 @@ func generateDedupeQueries(dialect sql.Dialect, tableID, stagingTableID types.Ta
parts = append(parts,
fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s")) AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`,
stagingTableID.FullyQualifiedName(),
sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
tableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(orderByCols, ", "),
Expand Down
18 changes: 9 additions & 9 deletions clients/bigquery/bigquery_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

Expand All @@ -21,13 +21,13 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{})
parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{})
assert.Len(t, parts, 3)
assert.Equal(
t,
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
Expand All @@ -39,13 +39,13 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project12", "public", "customers")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
assert.Len(t, parts, 3)
assert.Equal(
t,
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC, `__artie_updated_at` ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
Expand All @@ -57,13 +57,13 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{})
parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{})
assert.Len(t, parts, 3)
assert.Equal(
t,
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
Expand All @@ -75,13 +75,13 @@ func TestGenerateDedupeQueries(t *testing.T) {
tableID := NewTableIdentifier("project123", "public", "user_settings")
stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5)))

parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true})
assert.Len(t, parts, 3)
assert.Equal(
t,
fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC, `__artie_updated_at` ASC) = 2)",
stagingTableID.FullyQualifiedName(),
fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))),
),
parts[0],
)
Expand Down
13 changes: 5 additions & 8 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing/decimal"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
)

func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) (any, error) {
Expand Down Expand Up @@ -55,7 +52,7 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)

return extTime.String(ext.PostgresDateFormat), nil
case ext.TimeKindType:
return extTime.String(sql.BQStreamingTimeFormat), nil
return extTime.String(dialect.BQStreamingTimeFormat), nil
}
case typing.Struct.Kind:
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
Expand Down
7 changes: 4 additions & 3 deletions lib/sql/bigquery.go → clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package sql
package dialect

import (
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)
Expand All @@ -28,7 +29,7 @@ func (BigQueryDialect) QuoteIdentifier(identifier string) string {
}

func (BigQueryDialect) EscapeStruct(value string) string {
return "JSON" + QuoteLiteral(value)
return "JSON" + sql.QuoteLiteral(value)
}

func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string {
Expand Down Expand Up @@ -68,7 +69,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD
return typing.Invalid, nil
}

bqType, parameters, err := ParseDataTypeDefinition(strings.ToLower(rawBqType))
bqType, parameters, err := sql.ParseDataTypeDefinition(strings.ToLower(rawBqType))
if err != nil {
return typing.Invalid, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package dialect

import (
"fmt"
Expand Down
10 changes: 5 additions & 5 deletions clients/bigquery/tableid.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package bigquery
import (
"fmt"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/sql"
)

var dialect = sql.BigQueryDialect{}
var _dialect = dialect.BigQueryDialect{}

type TableIdentifier struct {
projectID string
Expand All @@ -32,7 +32,7 @@ func (ti TableIdentifier) Dataset() string {
}

func (ti TableIdentifier) EscapedTable() string {
return dialect.QuoteIdentifier(ti.table)
return _dialect.QuoteIdentifier(ti.table)
}

func (ti TableIdentifier) Table() string {
Expand All @@ -47,8 +47,8 @@ func (ti TableIdentifier) FullyQualifiedName() string {
// The fully qualified name for BigQuery is: project_id.dataset.tableName.
// We are escaping the project_id, dataset, and table because there could be special characters.
return fmt.Sprintf("%s.%s.%s",
dialect.QuoteIdentifier(ti.projectID),
dialect.QuoteIdentifier(ti.dataset),
_dialect.QuoteIdentifier(ti.projectID),
_dialect.QuoteIdentifier(ti.dataset),
ti.EscapedTable(),
)
}
3 changes: 2 additions & 1 deletion clients/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log/slog"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/sql"
Expand Down Expand Up @@ -44,7 +45,7 @@ func BackfillColumn(dwh destination.DataWarehouse, column columns.Column, tableI
}

query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`)
if _, ok := dwh.Dialect().(sql.BigQueryDialect); ok {
if _, ok := dwh.Dialect().(bigQueryDialect.BigQueryDialect); ok {
query = fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET OPTIONS (description=`%s`);",
// ALTER TABLE table ALTER COLUMN col set OPTIONS (description=...)
tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`,
Expand Down
9 changes: 4 additions & 5 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"strings"
"time"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing/columns"
)

// DropTemporaryTable - this will drop the temporary table from Snowflake w/ stages and BigQuery
Expand Down Expand Up @@ -110,7 +109,7 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col

if len(pkCols) > 0 {
pkStatement := fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(pkCols, ", "))
if _, ok := a.Dialect.(sql.BigQueryDialect); ok {
if _, ok := a.Dialect.(bigQueryDialect.BigQueryDialect); ok {
pkStatement += " NOT ENFORCED"
}

Expand Down
4 changes: 2 additions & 2 deletions lib/destination/ddl/ddl_temp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/clients/bigquery"
bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/snowflake"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)
Expand All @@ -24,7 +24,7 @@ func (d *DDLTestSuite) TestValidate_AlterTableArgs() {
}
assert.ErrorContains(d.T(), a.Validate(), "dialect cannot be nil")

a.Dialect = sql.BigQueryDialect{}
a.Dialect = bigQueryDialect.BigQueryDialect{}
assert.ErrorContains(d.T(), a.Validate(), "incompatible operation - cannot drop columns and create table at the same time")

a.ColumnOp = constants.Add
Expand Down
13 changes: 7 additions & 6 deletions lib/destination/dml/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
Expand All @@ -12,11 +13,11 @@ import (
)

func TestQuoteColumns(t *testing.T) {
assert.Equal(t, []string{}, quoteColumns(nil, sql.BigQueryDialect{}))
assert.Equal(t, []string{}, quoteColumns(nil, bigQueryDialect.BigQueryDialect{}))
assert.Equal(t, []string{}, quoteColumns(nil, sql.SnowflakeDialect{}))

cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)}
assert.Equal(t, []string{"`a`", "`b`"}, quoteColumns(cols, sql.BigQueryDialect{}))
assert.Equal(t, []string{"`a`", "`b`"}, quoteColumns(cols, bigQueryDialect.BigQueryDialect{}))
assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{}))
}

Expand Down Expand Up @@ -152,13 +153,13 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
dialect: sql.BigQueryDialect{},
dialect: bigQueryDialect.BigQueryDialect{},
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
dialect: sql.BigQueryDialect{},
dialect: bigQueryDialect.BigQueryDialect{},
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"),
},
Expand All @@ -172,14 +173,14 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {

func TestBuildProcessToastStructColExpression(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, sql.RedshiftDialect{}.BuildProcessToastStructColExpression("foo"))
assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, sql.BigQueryDialect{}.BuildProcessToastStructColExpression("foo"))
assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, bigQueryDialect.BigQueryDialect{}.BuildProcessToastStructColExpression("foo"))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, sql.SnowflakeDialect{}.BuildProcessToastStructColExpression("foo"))
assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, sql.MSSQLDialect{}.BuildProcessToastStructColExpression("foo"))
}

func TestProcessToastCol(t *testing.T) {
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", bigQueryDialect.BigQueryDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{}))
assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{}))
}
3 changes: 2 additions & 1 deletion lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
Expand Down Expand Up @@ -174,7 +175,7 @@ func (m *MergeArgument) buildDefaultStatement() (string, error) {
idempotentClause = fmt.Sprintf("AND cc.%s >= c.%s ", m.IdempotentKey, m.IdempotentKey)
}

_, isBigQuery := m.Dialect.(sql.BigQueryDialect)
_, isBigQuery := m.Dialect.(dialect.BigQueryDialect)

var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
Expand Down
8 changes: 4 additions & 4 deletions lib/destination/dml/merge_bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package dml
import (
"testing"

bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/stretchr/testify/assert"
Expand All @@ -25,7 +25,7 @@ func TestMergeStatement_TempTable(t *testing.T) {
SubQuery: "customers.orders_tmp",
PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)},
Columns: cols.ValidColumns(),
Dialect: sql.BigQueryDialect{},
Dialect: bigQueryDialect.BigQueryDialect{},
SoftDelete: false,
}

Expand All @@ -50,7 +50,7 @@ func TestMergeStatement_JSONKey(t *testing.T) {
SubQuery: "customers.orders_tmp",
PrimaryKeys: []columns.Column{orderOIDCol},
Columns: cols.ValidColumns(),
Dialect: sql.BigQueryDialect{},
Dialect: bigQueryDialect.BigQueryDialect{},
SoftDelete: false,
}

Expand All @@ -71,7 +71,7 @@ func TestMergeArgument_BuildStatements_BigQuery(t *testing.T) {
SubQuery: "{SUB_QUERY}",
PrimaryKeys: []columns.Column{orderOIDCol},
Columns: cols.ValidColumns(),
Dialect: sql.BigQueryDialect{},
Dialect: bigQueryDialect.BigQueryDialect{},
SoftDelete: false,
}

Expand Down
Loading

0 comments on commit 8b63f5d

Please sign in to comment.