diff --git a/clients/bigquery/append.go b/clients/bigquery/append.go index 22b18e5c8..8e4e221b0 100644 --- a/clients/bigquery/append.go +++ b/clients/bigquery/append.go @@ -8,5 +8,5 @@ import ( func (s *Store) Append(tableData *optimization.TableData) error { tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, s.config, types.AppendOpts{TempTableID: tableID}) + return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) } diff --git a/clients/mssql/store.go b/clients/mssql/store.go index 98490bb3b..02daf0791 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -44,7 +44,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { func (s *Store) Append(tableData *optimization.TableData) error { tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, s.config, types.AppendOpts{TempTableID: tableID}) + return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) } // specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name. diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go index 167e34da2..fcaf08e67 100644 --- a/clients/redshift/writes.go +++ b/clients/redshift/writes.go @@ -14,7 +14,7 @@ func (s *Store) Append(tableData *optimization.TableData) error { // Redshift is slightly different, we'll load and create the temporary table via shared.Append // Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs. temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix()) - if err := shared.Append(s, tableData, s.config, types.AppendOpts{TempTableID: temporaryTableID}); err != nil { + if err := shared.Append(s, tableData, types.AppendOpts{TempTableID: temporaryTableID}); err != nil { return err } diff --git a/clients/shared/append.go b/clients/shared/append.go index 5437efb43..21c755194 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -3,16 +3,16 @@ package shared import ( "log/slog" - "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" + "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing/columns" ) -func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg config.Config, opts types.AppendOpts) error { +func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AppendOpts) error { if tableData.ShouldSkipUpdate() { return nil } @@ -35,7 +35,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, cf CreateTable: tableConfig.CreateTable(), ColumnOp: constants.Add, CdcTime: tableData.LatestCDCTs, - UppercaseEscNames: &cfg.SharedDestinationConfig.UppercaseEscapedNames, + UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), Mode: tableData.Mode(), } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 6b1533659..735bd6ac5 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -42,7 +42,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg CreateTable: tableConfig.CreateTable(), ColumnOp: constants.Add, CdcTime: tableData.LatestCDCTs, - UppercaseEscNames: &cfg.SharedDestinationConfig.UppercaseEscapedNames, + UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), Mode: tableData.Mode(), } @@ -61,7 +61,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg ColumnOp: constants.Delete, ContainOtherOperations: tableData.ContainOtherOperations(), CdcTime: tableData.LatestCDCTs, - UppercaseEscNames: &cfg.SharedDestinationConfig.UppercaseEscapedNames, + UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), Mode: tableData.Mode(), } @@ -123,11 +123,11 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg TableID: tableID, SubQuery: subQuery, IdempotentKey: tableData.TopicConfig().IdempotentKey, - PrimaryKeys: tableData.PrimaryKeys(cfg.SharedDestinationConfig.UppercaseEscapedNames, &sql.NameArgs{Escape: true, DestKind: dwh.Label()}), + PrimaryKeys: tableData.PrimaryKeys(dwh.ShouldUppercaseEscapedNames(), &sql.NameArgs{Escape: true, DestKind: dwh.Label()}), Columns: tableData.ReadOnlyInMemoryCols(), SoftDelete: tableData.TopicConfig().SoftDelete, DestKind: dwh.Label(), - UppercaseEscNames: &cfg.SharedDestinationConfig.UppercaseEscapedNames, + UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()), ContainsHardDeletes: ptr.ToBool(tableData.ContainsHardDeletes()), } diff --git a/clients/shared/utils.go b/clients/shared/utils.go index 19f13013d..2fefe7b2c 100644 --- a/clients/shared/utils.go +++ b/clients/shared/utils.go @@ -32,8 +32,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col return fmt.Errorf("failed to escape default value: %w", err) } - uppercaseEscNames := cfg.SharedDestinationConfig.UppercaseEscapedNames - escapedCol := column.Name(uppercaseEscNames, &sql.NameArgs{Escape: true, DestKind: dwh.Label()}) + escapedCol := column.Name(dwh.ShouldUppercaseEscapedNames(), &sql.NameArgs{Escape: true, DestKind: dwh.Label()}) // TODO: This is added because `default` is not technically a column that requires escaping, but it is required when it's in the where clause. // Once we escape everything by default, we can remove this patch of code. diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index 81a31509d..90bdda710 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -26,7 +26,7 @@ func (s *Store) Append(tableData *optimization.TableData) error { tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. - err = shared.Append(s, tableData, s.config, types.AppendOpts{ + err = shared.Append(s, tableData, types.AppendOpts{ TempTableID: tableID, AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, })