From 8b63f5da6e93c26e6d42a4cb3b8f211145931bdd Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Fri, 10 May 2024 16:10:01 -0700 Subject: [PATCH] [bigquery] Move `BigQueryDialect` to `client/bigquery` --- clients/bigquery/bigquery.go | 11 ++++++----- clients/bigquery/bigquery_dedupe_test.go | 18 +++++++++--------- clients/bigquery/cast.go | 13 +++++-------- .../bigquery/dialect/dialect.go | 7 ++++--- .../bigquery/dialect/dialect_test.go | 2 +- clients/bigquery/tableid.go | 10 +++++----- clients/shared/utils.go | 3 ++- lib/destination/ddl/ddl.go | 9 ++++----- lib/destination/ddl/ddl_temp_test.go | 4 ++-- lib/destination/dml/columns_test.go | 13 +++++++------ lib/destination/dml/merge.go | 3 ++- lib/destination/dml/merge_bigquery_test.go | 8 ++++---- lib/destination/dml/merge_test.go | 3 ++- lib/destination/dml/merge_valid_test.go | 12 +++++------- lib/sql/util_test.go | 4 ++-- lib/typing/columns/default_test.go | 11 +++++------ 16 files changed, 65 insertions(+), 66 deletions(-) rename lib/sql/bigquery.go => clients/bigquery/dialect/dialect.go (96%) rename lib/sql/bigquery_test.go => clients/bigquery/dialect/dialect_test.go (99%) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index a8ec15e95..a9d0d7e08 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -11,6 +11,7 @@ import ( "cloud.google.com/go/bigquery" _ "github.com/viant/bigquery" + "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" @@ -115,7 +116,7 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap { } func (s *Store) Dialect() sql.Dialect { - return sql.BigQueryDialect{} + return dialect.BigQueryDialect{} } func (s *Store) AdditionalDateFormats() []string { @@ -151,12 +152,12 @@ func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, row return nil } -func generateDedupeQueries(dialect sql.Dialect, tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { - primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, dialect) +func generateDedupeQueries(_dialect sql.Dialect, tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { + primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, _dialect) orderColsToIterate := primaryKeysEscaped if topicConfig.IncludeArtieUpdatedAt { - orderColsToIterate = append(orderColsToIterate, dialect.QuoteIdentifier(constants.UpdateColumnMarker)) + orderColsToIterate = append(orderColsToIterate, _dialect.QuoteIdentifier(constants.UpdateColumnMarker)) } var orderByCols []string @@ -168,7 +169,7 @@ func generateDedupeQueries(dialect sql.Dialect, tableID, stagingTableID types.Ta parts = append(parts, fmt.Sprintf(`CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP("%s")) AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2)`, stagingTableID.FullyQualifiedName(), - sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)), + dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)), tableID.FullyQualifiedName(), strings.Join(primaryKeysEscaped, ", "), strings.Join(orderByCols, ", "), diff --git a/clients/bigquery/bigquery_dedupe_test.go b/clients/bigquery/bigquery_dedupe_test.go index 9bf82b483..c67cac2e1 100644 --- a/clients/bigquery/bigquery_dedupe_test.go +++ b/clients/bigquery/bigquery_dedupe_test.go @@ -8,10 +8,10 @@ import ( "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/stringutil" ) @@ -21,13 +21,13 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project12", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) assert.Len(t, parts, 3) assert.Equal( t, fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC) = 2)", stagingTableID.FullyQualifiedName(), - fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), + fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) @@ -39,13 +39,13 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project12", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) assert.Len(t, parts, 3) assert.Equal( t, fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project12`.`public`.`customers` QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `id` ASC, `__artie_updated_at` ASC) = 2)", stagingTableID.FullyQualifiedName(), - fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), + fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) @@ -57,13 +57,13 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project123", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) assert.Len(t, parts, 3) assert.Equal( t, fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC) = 2)", stagingTableID.FullyQualifiedName(), - fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), + fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) @@ -75,13 +75,13 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project123", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := generateDedupeQueries(sql.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := generateDedupeQueries(dialect.BigQueryDialect{}, tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) assert.Len(t, parts, 3) assert.Equal( t, fmt.Sprintf("CREATE OR REPLACE TABLE %s OPTIONS (expiration_timestamp = TIMESTAMP(%s)) AS (SELECT * FROM `project123`.`public`.`user_settings` QUALIFY ROW_NUMBER() OVER (PARTITION BY `user_id`, `settings` ORDER BY `user_id` ASC, `settings` ASC, `__artie_updated_at` ASC) = 2)", stagingTableID.FullyQualifiedName(), - fmt.Sprintf(`"%s"`, sql.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), + fmt.Sprintf(`"%s"`, dialect.BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL))), ), parts[0], ) diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index 25cf61df3..c667e99d3 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -4,16 +4,13 @@ import ( "fmt" "strings" - "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/typing/decimal" - - "github.com/artie-labs/transfer/lib/typing/columns" - + "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing/ext" - "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" ) func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) (any, error) { @@ -55,7 +52,7 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) return extTime.String(ext.PostgresDateFormat), nil case ext.TimeKindType: - return extTime.String(sql.BQStreamingTimeFormat), nil + return extTime.String(dialect.BQStreamingTimeFormat), nil } case typing.Struct.Kind: if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { diff --git a/lib/sql/bigquery.go b/clients/bigquery/dialect/dialect.go similarity index 96% rename from lib/sql/bigquery.go rename to clients/bigquery/dialect/dialect.go index 8b78d3843..4b6a55a11 100644 --- a/lib/sql/bigquery.go +++ b/clients/bigquery/dialect/dialect.go @@ -1,4 +1,4 @@ -package sql +package dialect import ( "fmt" @@ -6,6 +6,7 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" ) @@ -28,7 +29,7 @@ func (BigQueryDialect) QuoteIdentifier(identifier string) string { } func (BigQueryDialect) EscapeStruct(value string) string { - return "JSON" + QuoteLiteral(value) + return "JSON" + sql.QuoteLiteral(value) } func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string { @@ -68,7 +69,7 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD return typing.Invalid, nil } - bqType, parameters, err := ParseDataTypeDefinition(strings.ToLower(rawBqType)) + bqType, parameters, err := sql.ParseDataTypeDefinition(strings.ToLower(rawBqType)) if err != nil { return typing.Invalid, err } diff --git a/lib/sql/bigquery_test.go b/clients/bigquery/dialect/dialect_test.go similarity index 99% rename from lib/sql/bigquery_test.go rename to clients/bigquery/dialect/dialect_test.go index 342b2795d..89c341427 100644 --- a/lib/sql/bigquery_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -1,4 +1,4 @@ -package sql +package dialect import ( "fmt" diff --git a/clients/bigquery/tableid.go b/clients/bigquery/tableid.go index deb04d1f4..7ffe62030 100644 --- a/clients/bigquery/tableid.go +++ b/clients/bigquery/tableid.go @@ -3,11 +3,11 @@ package bigquery import ( "fmt" + "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/sql" ) -var dialect = sql.BigQueryDialect{} +var _dialect = dialect.BigQueryDialect{} type TableIdentifier struct { projectID string @@ -32,7 +32,7 @@ func (ti TableIdentifier) Dataset() string { } func (ti TableIdentifier) EscapedTable() string { - return dialect.QuoteIdentifier(ti.table) + return _dialect.QuoteIdentifier(ti.table) } func (ti TableIdentifier) Table() string { @@ -47,8 +47,8 @@ func (ti TableIdentifier) FullyQualifiedName() string { // The fully qualified name for BigQuery is: project_id.dataset.tableName. // We are escaping the project_id, dataset, and table because there could be special characters. return fmt.Sprintf("%s.%s.%s", - dialect.QuoteIdentifier(ti.projectID), - dialect.QuoteIdentifier(ti.dataset), + _dialect.QuoteIdentifier(ti.projectID), + _dialect.QuoteIdentifier(ti.dataset), ti.EscapedTable(), ) } diff --git a/clients/shared/utils.go b/clients/shared/utils.go index e48e4c2b3..f014d848c 100644 --- a/clients/shared/utils.go +++ b/clients/shared/utils.go @@ -4,6 +4,7 @@ import ( "fmt" "log/slog" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/sql" @@ -44,7 +45,7 @@ func BackfillColumn(dwh destination.DataWarehouse, column columns.Column, tableI } query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`) - if _, ok := dwh.Dialect().(sql.BigQueryDialect); ok { + if _, ok := dwh.Dialect().(bigQueryDialect.BigQueryDialect); ok { query = fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET OPTIONS (description=`%s`);", // ALTER TABLE table ALTER COLUMN col set OPTIONS (description=...) tableID.FullyQualifiedName(), escapedCol, `{"backfilled": true}`, diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 7f6a65e44..c0f7eabfb 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -6,14 +6,13 @@ import ( "strings" "time" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/sql" - - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/types" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing/columns" ) // DropTemporaryTable - this will drop the temporary table from Snowflake w/ stages and BigQuery @@ -110,7 +109,7 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col if len(pkCols) > 0 { pkStatement := fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(pkCols, ", ")) - if _, ok := a.Dialect.(sql.BigQueryDialect); ok { + if _, ok := a.Dialect.(bigQueryDialect.BigQueryDialect); ok { pkStatement += " NOT ENFORCED" } diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index a00e3d83d..1dc6cbae8 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -6,12 +6,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/clients/bigquery" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/snowflake" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -24,7 +24,7 @@ func (d *DDLTestSuite) TestValidate_AlterTableArgs() { } assert.ErrorContains(d.T(), a.Validate(), "dialect cannot be nil") - a.Dialect = sql.BigQueryDialect{} + a.Dialect = bigQueryDialect.BigQueryDialect{} assert.ErrorContains(d.T(), a.Validate(), "incompatible operation - cannot drop columns and create table at the same time") a.ColumnOp = constants.Add diff --git a/lib/destination/dml/columns_test.go b/lib/destination/dml/columns_test.go index a51f2dfd1..2646c1bee 100644 --- a/lib/destination/dml/columns_test.go +++ b/lib/destination/dml/columns_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -12,11 +13,11 @@ import ( ) func TestQuoteColumns(t *testing.T) { - assert.Equal(t, []string{}, quoteColumns(nil, sql.BigQueryDialect{})) + assert.Equal(t, []string{}, quoteColumns(nil, bigQueryDialect.BigQueryDialect{})) assert.Equal(t, []string{}, quoteColumns(nil, sql.SnowflakeDialect{})) cols := []columns.Column{columns.NewColumn("a", typing.Invalid), columns.NewColumn("b", typing.Invalid)} - assert.Equal(t, []string{"`a`", "`b`"}, quoteColumns(cols, sql.BigQueryDialect{})) + assert.Equal(t, []string{"`a`", "`b`"}, quoteColumns(cols, bigQueryDialect.BigQueryDialect{})) assert.Equal(t, []string{`"A"`, `"B"`}, quoteColumns(cols, sql.SnowflakeDialect{})) } @@ -152,13 +153,13 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { { name: "struct, string and toast string (bigquery)", columns: lastCaseColTypes, - dialect: sql.BigQueryDialect{}, + dialect: bigQueryDialect.BigQueryDialect{}, expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`", }, { name: "struct, string and toast string (bigquery) w/ reserved keywords", columns: lastCaseEscapeTypes, - dialect: sql.BigQueryDialect{}, + dialect: bigQueryDialect.BigQueryDialect{}, expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`a1`) != '%s', true) THEN cc.`a1` ELSE c.`a1` END,`b2`= CASE WHEN COALESCE(cc.`b2` != '__debezium_unavailable_value', true) THEN cc.`b2` ELSE c.`b2` END,`c3`=cc.`c3`,%s,%s", key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(cc.`start`) != '%s', true) THEN cc.`start` ELSE c.`start` END", key), "`select`=cc.`select`,`__artie_delete`=cc.`__artie_delete`"), }, @@ -172,14 +173,14 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { func TestBuildProcessToastStructColExpression(t *testing.T) { assert.Equal(t, `CASE WHEN COALESCE(cc.foo != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN cc.foo ELSE c.foo END`, sql.RedshiftDialect{}.BuildProcessToastStructColExpression("foo")) - assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, sql.BigQueryDialect{}.BuildProcessToastStructColExpression("foo")) + assert.Equal(t, `CASE WHEN COALESCE(TO_JSON_STRING(cc.foo) != '{"key":"__debezium_unavailable_value"}', true) THEN cc.foo ELSE c.foo END`, bigQueryDialect.BigQueryDialect{}.BuildProcessToastStructColExpression("foo")) assert.Equal(t, `CASE WHEN COALESCE(cc.foo != {'key': '__debezium_unavailable_value'}, true) THEN cc.foo ELSE c.foo END`, sql.SnowflakeDialect{}.BuildProcessToastStructColExpression("foo")) assert.Equal(t, `CASE WHEN COALESCE(cc.foo, {}) != {'key': '__debezium_unavailable_value'} THEN cc.foo ELSE c.foo END`, sql.MSSQLDialect{}.BuildProcessToastStructColExpression("foo")) } func TestProcessToastCol(t *testing.T) { assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.RedshiftDialect{})) - assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.BigQueryDialect{})) + assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", bigQueryDialect.BigQueryDialect{})) assert.Equal(t, `CASE WHEN COALESCE(cc.bar != '__debezium_unavailable_value', true) THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.SnowflakeDialect{})) assert.Equal(t, `CASE WHEN COALESCE(cc.bar, '') != '__debezium_unavailable_value' THEN cc.bar ELSE c.bar END`, processToastCol("bar", sql.MSSQLDialect{})) } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index bacb4363a..5cc6ae193 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" @@ -174,7 +175,7 @@ func (m *MergeArgument) buildDefaultStatement() (string, error) { idempotentClause = fmt.Sprintf("AND cc.%s >= c.%s ", m.IdempotentKey, m.IdempotentKey) } - _, isBigQuery := m.Dialect.(sql.BigQueryDialect) + _, isBigQuery := m.Dialect.(dialect.BigQueryDialect) var equalitySQLParts []string for _, primaryKey := range m.PrimaryKeys { diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go index f12d6c4f9..09ae50805 100644 --- a/lib/destination/dml/merge_bigquery_test.go +++ b/lib/destination/dml/merge_bigquery_test.go @@ -3,9 +3,9 @@ package dml import ( "testing" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/stretchr/testify/assert" @@ -25,7 +25,7 @@ func TestMergeStatement_TempTable(t *testing.T) { SubQuery: "customers.orders_tmp", PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)}, Columns: cols.ValidColumns(), - Dialect: sql.BigQueryDialect{}, + Dialect: bigQueryDialect.BigQueryDialect{}, SoftDelete: false, } @@ -50,7 +50,7 @@ func TestMergeStatement_JSONKey(t *testing.T) { SubQuery: "customers.orders_tmp", PrimaryKeys: []columns.Column{orderOIDCol}, Columns: cols.ValidColumns(), - Dialect: sql.BigQueryDialect{}, + Dialect: bigQueryDialect.BigQueryDialect{}, SoftDelete: false, } @@ -71,7 +71,7 @@ func TestMergeArgument_BuildStatements_BigQuery(t *testing.T) { SubQuery: "{SUB_QUERY}", PrimaryKeys: []columns.Column{orderOIDCol}, Columns: cols.ValidColumns(), - Dialect: sql.BigQueryDialect{}, + Dialect: bigQueryDialect.BigQueryDialect{}, SoftDelete: false, } diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 1ce8dbf59..1a406216d 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/sql" @@ -366,7 +367,7 @@ func TestMergeArgument_BuildRedshiftDeleteQuery(t *testing.T) { func TestMergeArgument_BuildStatements_Validation(t *testing.T) { for _, arg := range []*MergeArgument{ {Dialect: sql.SnowflakeDialect{}}, - {Dialect: sql.BigQueryDialect{}}, + {Dialect: bigQueryDialect.BigQueryDialect{}}, } { parts, err := arg.BuildStatements() assert.ErrorContains(t, err, "merge argument does not contain primary keys") diff --git a/lib/destination/dml/merge_valid_test.go b/lib/destination/dml/merge_valid_test.go index 9639c34ea..69a95bedc 100644 --- a/lib/destination/dml/merge_valid_test.go +++ b/lib/destination/dml/merge_valid_test.go @@ -3,14 +3,12 @@ package dml import ( "testing" - "github.com/artie-labs/transfer/lib/mocks" - - "github.com/artie-labs/transfer/lib/sql" + "github.com/stretchr/testify/assert" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" + "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - - "github.com/stretchr/testify/assert" ) func TestMergeArgument_Valid(t *testing.T) { @@ -88,7 +86,7 @@ func TestMergeArgument_Valid(t *testing.T) { Columns: cols, SubQuery: "schema.tableName", TableID: &mocks.FakeTableIdentifier{}, - Dialect: sql.BigQueryDialect{}, + Dialect: bigQueryDialect.BigQueryDialect{}, }, }, { @@ -98,7 +96,7 @@ func TestMergeArgument_Valid(t *testing.T) { Columns: []columns.Column{columns.NewColumn("id", typing.Invalid)}, SubQuery: "schema.tableName", TableID: &mocks.FakeTableIdentifier{}, - Dialect: sql.BigQueryDialect{}, + Dialect: bigQueryDialect.BigQueryDialect{}, }, expectedErr: `column "id" is invalid and should be skipped`, }, diff --git a/lib/sql/util_test.go b/lib/sql/util_test.go index c5913145a..1b017dd07 100644 --- a/lib/sql/util_test.go +++ b/lib/sql/util_test.go @@ -40,8 +40,8 @@ func TestQuoteLiteral(t *testing.T) { } func TestQuoteIdentifiers(t *testing.T) { - assert.Equal(t, []string{}, QuoteIdentifiers([]string{}, BigQueryDialect{})) - assert.Equal(t, []string{"`a`", "`b`", "`c`"}, QuoteIdentifiers([]string{"a", "b", "c"}, BigQueryDialect{})) + assert.Equal(t, []string{}, QuoteIdentifiers([]string{}, RedshiftDialect{})) + assert.Equal(t, []string{`"a"`, `"b"`, `"c"`}, QuoteIdentifiers([]string{"a", "b", "c"}, RedshiftDialect{})) } func TestParseDataTypeDefinition(t *testing.T) { diff --git a/lib/typing/columns/default_test.go b/lib/typing/columns/default_test.go index 0da9f8cfe..9bf596527 100644 --- a/lib/typing/columns/default_test.go +++ b/lib/typing/columns/default_test.go @@ -5,17 +5,16 @@ import ( "testing" "time" - "github.com/artie-labs/transfer/lib/sql" - - "github.com/artie-labs/transfer/lib/typing/ext" + "github.com/stretchr/testify/assert" + bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" + "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" - - "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/typing/ext" ) var dialects = []sql.Dialect{ - sql.BigQueryDialect{}, + bigQueryDialect.BigQueryDialect{}, sql.RedshiftDialect{}, sql.SnowflakeDialect{}, }