Skip to content

Commit

Permalink
Add Dialect() to DataWarehouse
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 1, 2024
1 parent 98135da commit 92af7c3
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 8 deletions.
8 changes: 6 additions & 2 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (s *Store) Label() constants.DestinationKind {
return constants.BigQuery
}

func (s *Store) Dialect() sql.Dialect {
return sql.BigQueryDialect{}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}
Expand Down Expand Up @@ -151,12 +155,12 @@ func (s *Store) putTable(ctx context.Context, tableID types.TableIdentifier, row
func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
var primaryKeysEscaped []string
for _, pk := range primaryKeys {
primaryKeysEscaped = append(primaryKeysEscaped, sql.EscapeNameIfNecessary(pk, s.ShouldUppercaseEscapedNames(), s.Label()))
primaryKeysEscaped = append(primaryKeysEscaped, s.Dialect().QuoteIdentifier(pk))
}

orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, sql.EscapeNameIfNecessary(constants.UpdateColumnMarker, s.ShouldUppercaseEscapedNames(), s.Label()))
orderColsToIterate = append(orderColsToIterate, s.Dialect().QuoteIdentifier(constants.UpdateColumnMarker))
}

var orderByCols []string
Expand Down
5 changes: 5 additions & 0 deletions clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
)

type Store struct {
Expand All @@ -34,6 +35,10 @@ func (s *Store) Label() constants.DestinationKind {
return constants.MSSQL
}

func (s *Store) Dialect() sql.Dialect {
return sql.DefaultDialect{}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}
Expand Down
5 changes: 5 additions & 0 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
)

Expand Down Expand Up @@ -45,6 +46,10 @@ func (s *Store) Label() constants.DestinationKind {
return constants.Redshift
}

func (s *Store) Dialect() sql.Dialect {
return sql.RedshiftDialect{}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}
Expand Down
11 changes: 5 additions & 6 deletions clients/shared/table_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"

"github.com/stretchr/testify/assert"

sqllib "github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/destination/types"
)

func TestGetTableCfgArgs_ShouldParseComment(t *testing.T) {
Expand Down Expand Up @@ -58,6 +56,7 @@ func TestGetTableCfgArgs_ShouldParseComment(t *testing.T) {
type MockDWH struct{}

func (MockDWH) Label() constants.DestinationKind { panic("not implemented") }
func (MockDWH) Dialect() sqllib.Dialect { panic("not implemented") }
func (MockDWH) Merge(tableData *optimization.TableData) error { panic("not implemented") }
func (MockDWH) Append(tableData *optimization.TableData) error { panic("not implemented") }
func (MockDWH) Dedupe(tableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error {
Expand Down
4 changes: 4 additions & 0 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (s *Store) Label() constants.DestinationKind {
return constants.Snowflake
}

func (s *Store) Dialect() sql.Dialect {
return sql.SnowflakeDialect{UppercaseEscNames: s.ShouldUppercaseEscapedNames()}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return s.config.SharedDestinationConfig.UppercaseEscapedNames
}
Expand Down
2 changes: 2 additions & 0 deletions lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
sqllib "github.com/artie-labs/transfer/lib/sql"
)

type DataWarehouse interface {
Label() constants.DestinationKind
Dialect() sqllib.Dialect
Merge(tableData *optimization.TableData) error
Append(tableData *optimization.TableData) error
Dedupe(tableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error
Expand Down

0 comments on commit 92af7c3

Please sign in to comment.