diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 743e6dbb6..d2883c57d 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -78,7 +78,7 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err return nil } -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index 68fd19918..89a2026ec 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -13,7 +13,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 735a66a69..cd46578a0 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -14,7 +14,6 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/typing" ) type Store struct { @@ -52,6 +51,10 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap { } func (s *Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s *Store) dialect() dialect.RedshiftDialect { return dialect.RedshiftDialect{} } @@ -76,12 +79,7 @@ func (s *Store) Sweep() error { return err } - redshiftDialect, err := typing.AssertType[dialect.RedshiftDialect](s.Dialect()) - if err != nil { - return err - } - - return shared.Sweep(s, tcs, redshiftDialect.BuildSweepQuery) + return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery) } func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 9d5893f99..85de9e39b 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -17,7 +17,7 @@ import ( "github.com/artie-labs/transfer/lib/sql" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 71e32e9bb..1ce39e74a 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -76,7 +76,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt } }() - if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil { + if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, tableID, types.AdditionalSettings{}, true); err != nil { return fmt.Errorf("failed to prepare temporary table: %w", err) } diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index 95a3e8575..fb46100cd 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -56,6 +56,10 @@ func (s *Store) Sweep() error { } func (s *Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s *Store) dialect() dialect.SnowflakeDialect { return dialect.SnowflakeDialect{} } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 2e58ba6d4..6a279c7b0 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -54,7 +54,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { return replaceExceededValues(value, colKind), nil } -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 44484c356..d5adb54f2 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -23,7 +23,7 @@ type DataWarehouse interface { // Helper functions for merge GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) - PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error + PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, parentTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error } type Baseline interface {