Skip to content

Commit

Permalink
Kill uppercase escape name
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Apr 29, 2024
1 parent e01e7c5 commit 4bfd93c
Show file tree
Hide file tree
Showing 35 changed files with 299 additions and 430 deletions.
19 changes: 7 additions & 12 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ type Store struct {
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
Expand Down Expand Up @@ -110,10 +109,6 @@ func (s *Store) Label() constants.DestinationKind {
return constants.BigQuery
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}

func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
client, err := bigquery.NewClient(ctx, s.config.BigQuery.ProjectID)
if err != nil {
Expand Down
16 changes: 7 additions & 9 deletions clients/mssql/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ func (s *Store) Label() constants.DestinationKind {
return constants.MSSQL
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}

func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, s.config, types.MergeOpts{})
}
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/tableid.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (ti TableIdentifier) FullyQualifiedName() string {
return fmt.Sprintf(
"%s.%s",
ti.schema,
sql.EscapeName(ti.table, false, constants.MSSQL),
sql.EscapeName(ti.table, constants.MSSQL),
)
}
4 changes: 0 additions & 4 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func (s *Store) Label() constants.DestinationKind {
return constants.Redshift
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}

func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
const (
describeNameCol = "column_name"
Expand Down
16 changes: 7 additions & 9 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/s3lib"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error {
// Redshift always creates a temporary table.
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion clients/redshift/tableid.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func (ti TableIdentifier) FullyQualifiedName() string {
return fmt.Sprintf(
"%s.%s",
ti.schema,
sql.EscapeName(ti.table, false, constants.Redshift),
sql.EscapeName(ti.table, constants.Redshift),
)
}
16 changes: 7 additions & 9 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/columns"
)

Expand All @@ -29,14 +28,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode())

createAlterTableArgs := ddl.AlterTableArgs{
Dwh: dwh,
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: dwh,
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Keys that exist in CDC stream, but not in DWH
Expand Down
19 changes: 8 additions & 11 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: dwh,
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: dwh,
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Columns that are missing in DWH, but exist in our CDC stream.
Expand All @@ -60,7 +59,6 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
ColumnOp: constants.Delete,
ContainOtherOperations: tableData.ContainOtherOperations(),
CdcTime: tableData.LatestCDCTs,
UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
}

Expand Down Expand Up @@ -122,11 +120,10 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
TableID: tableID,
SubQuery: subQuery,
IdempotentKey: tableData.TopicConfig().IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(dwh.ShouldUppercaseEscapedNames(), dwh.Label()),
PrimaryKeys: tableData.PrimaryKeys(dwh.Label()),
Columns: tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.TopicConfig().SoftDelete,
DestKind: dwh.Label(),
UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()),
ContainsHardDeletes: ptr.ToBool(tableData.ContainsHardDeletes()),
}

Expand Down
1 change: 0 additions & 1 deletion clients/shared/table_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (MockDWH) PrepareTemporaryTable(tableData *optimization.TableData, tableCon
func (MockDWH) IdentifierFor(topicConfig kafkalib.TopicConfig, name string) types.TableIdentifier {
panic("not implemented")
}
func (MockDWH) ShouldUppercaseEscapedNames() bool { return true }

type MockTableIdentifier struct{ fqName string }

Expand Down
2 changes: 1 addition & 1 deletion clients/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col
return fmt.Errorf("failed to escape default value: %w", err)
}

escapedCol := column.Name(dwh.ShouldUppercaseEscapedNames(), dwh.Label())
escapedCol := column.Name(dwh.Label())

// TODO: This is added because `default` is not technically a column that requires escaping, but it is required when it's in the where clause.
// Once we escape everything by default, we can remove this patch of code.
Expand Down
8 changes: 2 additions & 6 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ func (s *Store) Label() constants.DestinationKind {
return constants.Snowflake
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return true
}

func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap {
if s == nil {
return nil
Expand Down Expand Up @@ -133,7 +129,7 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif
var primaryKeysEscaped []string
for _, pk := range primaryKeys {
pkCol := columns.NewColumn(pk, typing.Invalid)
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.ShouldUppercaseEscapedNames(), s.Label()))
primaryKeysEscaped = append(primaryKeysEscaped, pkCol.Name(s.Label()))
}

orderColsToIterate := primaryKeysEscaped
Expand All @@ -146,7 +142,7 @@ func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentif
orderByCols = append(orderByCols, fmt.Sprintf("%s ASC", pk))
}

temporaryTableName := sql.EscapeName(stagingTableID.Table(), s.ShouldUppercaseEscapedNames(), s.Label())
temporaryTableName := sql.EscapeName(stagingTableID.Table(), s.Label())
var parts []string
parts = append(parts, fmt.Sprintf("CREATE OR REPLACE TRANSIENT TABLE %s AS (SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY by %s ORDER BY %s) = 2)",
temporaryTableName,
Expand Down
18 changes: 8 additions & 10 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/values"
Expand Down Expand Up @@ -49,14 +48,13 @@ func castColValStaging(colVal any, colKind columns.Column, additionalDateFmts []
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
Expand Down Expand Up @@ -85,7 +83,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
// COPY the CSV file (in Snowflake) into a table
copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)",
tempTableID.FullyQualifiedName(),
strings.Join(tableData.ReadOnlyInMemoryCols().GetEscapedColumnsToUpdate(s.ShouldUppercaseEscapedNames(), s.Label()), ","),
strings.Join(tableData.ReadOnlyInMemoryCols().GetEscapedColumnsToUpdate(s.Label()), ","),
escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%"))

if additionalSettings.AdditionalCopyClause != "" {
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/tableid.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func (ti TableIdentifier) FullyQualifiedName() string {
"%s.%s.%s",
ti.database,
ti.schema,
sql.EscapeName(ti.table, true, constants.Snowflake),
sql.EscapeName(ti.table, constants.Snowflake),
)
}
2 changes: 1 addition & 1 deletion clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
var additionalEqualityStrings []string
if len(tableData.TopicConfig().AdditionalMergePredicates) > 0 {
for _, additionalMergePredicate := range tableData.TopicConfig().AdditionalMergePredicates {
mergePredicateCol := sql.EscapeName(additionalMergePredicate.PartitionField, s.ShouldUppercaseEscapedNames(), s.Label())
mergePredicateCol := sql.EscapeName(additionalMergePredicate.PartitionField, s.Label())
additionalEqualityStrings = append(additionalEqualityStrings, fmt.Sprintf("c.%s = cc.%s", mergePredicateCol, mergePredicateCol))
}
}
Expand Down
9 changes: 2 additions & 7 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type AlterTableArgs struct {
TableID types.TableIdentifier
CreateTable bool
TemporaryTable bool
UppercaseEscNames *bool

ColumnOp constants.ColumnOperation
Mode config.Mode
Expand All @@ -69,10 +68,6 @@ func (a AlterTableArgs) Validate() error {
}
}

if a.UppercaseEscNames == nil {
return fmt.Errorf("uppercaseEscNames cannot be nil")
}

return nil
}

Expand Down Expand Up @@ -104,7 +99,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error {
mutateCol = append(mutateCol, col)
switch a.ColumnOp {
case constants.Add:
colName := col.Name(*a.UppercaseEscNames, a.Dwh.Label())
colName := col.Name(a.Dwh.Label())

if col.PrimaryKey() && a.Mode != config.History {
// Don't create a PK for history mode because it's append-only, so the primary key should not be enforced.
Expand All @@ -113,7 +108,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error {

colSQLParts = append(colSQLParts, fmt.Sprintf(`%s %s`, colName, typing.KindToDWHType(col.KindDetails, a.Dwh.Label(), col.PrimaryKey())))
case constants.Delete:
colSQLParts = append(colSQLParts, col.Name(*a.UppercaseEscNames, a.Dwh.Label()))
colSQLParts = append(colSQLParts, col.Name(a.Dwh.Label()))
}
}

Expand Down
Loading

0 comments on commit 4bfd93c

Please sign in to comment.