Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 26, 2024
1 parent fb58dbc commit 2a1b99b
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err
return nil
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Expand Down
12 changes: 5 additions & 7 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

type Store struct {
Expand Down Expand Up @@ -52,6 +51,10 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap {
}

func (s *Store) Dialect() sql.Dialect {
return s.dialect()
}

func (s *Store) dialect() dialect.RedshiftDialect {
return dialect.RedshiftDialect{}
}

Expand All @@ -76,12 +79,7 @@ func (s *Store) Sweep() error {
return err
}

redshiftDialect, err := typing.AssertType[dialect.RedshiftDialect](s.Dialect())
if err != nil {
return err
}

return shared.Sweep(s, tcs, redshiftDialect.BuildSweepQuery)
return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery)
}

func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
Expand Down
2 changes: 1 addition & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/artie-labs/transfer/lib/sql"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Expand Down
2 changes: 1 addition & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt
}
}()

if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil {
if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, tableID, types.AdditionalSettings{}, true); err != nil {
return fmt.Errorf("failed to prepare temporary table: %w", err)
}

Expand Down
4 changes: 4 additions & 0 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (s *Store) Sweep() error {
}

func (s *Store) Dialect() sql.Dialect {
return s.dialect()
}

func (s *Store) dialect() dialect.SnowflakeDialect {
return dialect.SnowflakeDialect{}
}

Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
return replaceExceededValues(value, colKind), nil
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DataWarehouse interface {

// Helper functions for merge
GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error)
PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error
PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, parentTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error
}

type Baseline interface {
Expand Down

0 comments on commit 2a1b99b

Please sign in to comment.