From f076a18958b2b46dcf1337892e7be192d07087f5 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Wed, 1 May 2024 13:42:39 -0700 Subject: [PATCH] Add `Dialect()` to `DataWarehouse` (#524) --- clients/bigquery/bigquery.go | 8 ++++++-- clients/mssql/store.go | 5 +++++ clients/redshift/redshift.go | 5 +++++ clients/shared/table_config_test.go | 11 +++++------ clients/snowflake/snowflake.go | 4 ++++ lib/destination/dwh.go | 2 ++ 6 files changed, 27 insertions(+), 8 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 97cdc36e2..51078d4ce 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -115,6 +115,10 @@ func (s *Store) Label() constants.DestinationKind { return constants.BigQuery } +func (s *Store) Dialect() sql.Dialect { + return sql.BigQueryDialect{} +} + func (s *Store) ShouldUppercaseEscapedNames() bool { return false } @@ -151,12 +155,12 @@ func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, row func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { var primaryKeysEscaped []string for _, pk := range primaryKeys { - primaryKeysEscaped = append(primaryKeysEscaped, sql.EscapeNameIfNecessary(pk, s.ShouldUppercaseEscapedNames(), s.Label())) + primaryKeysEscaped = append(primaryKeysEscaped, s.Dialect().QuoteIdentifier(pk)) } orderColsToIterate := primaryKeysEscaped if topicConfig.IncludeArtieUpdatedAt { - orderColsToIterate = append(orderColsToIterate, sql.EscapeNameIfNecessary(constants.UpdateColumnMarker, s.ShouldUppercaseEscapedNames(), s.Label())) + orderColsToIterate = append(orderColsToIterate, s.Dialect().QuoteIdentifier(constants.UpdateColumnMarker)) } var orderByCols []string diff --git a/clients/mssql/store.go b/clients/mssql/store.go index ffd5725c5..f48c26d40 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -13,6 +13,7 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" + "github.com/artie-labs/transfer/lib/sql" ) type Store struct { @@ -34,6 +35,10 @@ func (s *Store) Label() constants.DestinationKind { return constants.MSSQL } +func (s *Store) Dialect() sql.Dialect { + return sql.DefaultDialect{} +} + func (s *Store) ShouldUppercaseEscapedNames() bool { return false } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 34acb73f4..952d89007 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -15,6 +15,7 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" + "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/stringutil" ) @@ -45,6 +46,10 @@ func (s *Store) Label() constants.DestinationKind { return constants.Redshift } +func (s *Store) Dialect() sql.Dialect { + return sql.RedshiftDialect{} +} + func (s *Store) ShouldUppercaseEscapedNames() bool { return false } diff --git a/clients/shared/table_config_test.go b/clients/shared/table_config_test.go index 1d8b26d81..db45a710e 100644 --- a/clients/shared/table_config_test.go +++ b/clients/shared/table_config_test.go @@ -5,18 +5,16 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/ptr" - - "github.com/stretchr/testify/assert" - + sqllib "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - - "github.com/artie-labs/transfer/lib/destination/types" ) func TestGetTableCfgArgs_ShouldParseComment(t *testing.T) { @@ -58,6 +56,7 @@ func TestGetTableCfgArgs_ShouldParseComment(t *testing.T) { type MockDWH struct{} func (MockDWH) Label() constants.DestinationKind { panic("not implemented") } +func (MockDWH) Dialect() sqllib.Dialect { panic("not implemented") } func (MockDWH) Merge(tableData *optimization.TableData) error { panic("not implemented") } func (MockDWH) Append(tableData *optimization.TableData) error { panic("not implemented") } func (MockDWH) Dedupe(tableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index 047da9dc0..14d859b06 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -77,6 +77,10 @@ func (s *Store) Label() constants.DestinationKind { return constants.Snowflake } +func (s *Store) Dialect() sql.Dialect { + return sql.SnowflakeDialect{UppercaseEscNames: s.ShouldUppercaseEscapedNames()} +} + func (s *Store) ShouldUppercaseEscapedNames() bool { return s.config.SharedDestinationConfig.UppercaseEscapedNames } diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 3a0acfa40..00a86b959 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -7,10 +7,12 @@ import ( "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" + sqllib "github.com/artie-labs/transfer/lib/sql" ) type DataWarehouse interface { Label() constants.DestinationKind + Dialect() sqllib.Dialect Merge(tableData *optimization.TableData) error Append(tableData *optimization.TableData) error Dedupe(tableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error