Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed May 2, 2024
1 parent e1732b2 commit d7b0701
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 27 deletions.
12 changes: 0 additions & 12 deletions clients/bigquery/append.go

This file was deleted.

4 changes: 4 additions & 0 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AppendOpts{})
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Expand Down
3 changes: 1 addition & 2 deletions clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
}

func (s *Store) Append(tableData *optimization.TableData) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID})
return shared.Append(s, tableData, types.AppendOpts{})
}

// specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name.
Expand Down
3 changes: 1 addition & 2 deletions clients/redshift/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
)

func (s *Store) Append(tableData *optimization.TableData) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID})
return shared.Append(s, tableData, types.AppendOpts{})
}

func (s *Store) Merge(tableData *optimization.TableData) error {
Expand Down
16 changes: 10 additions & 6 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
return nil
}

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableData)
if err != nil {
return fmt.Errorf("failed to get table config: %w", err)
Expand All @@ -32,6 +31,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: dwh,
Tc: tableConfig,
Expand All @@ -51,9 +51,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

additionalSettings := types.AdditionalSettings{
AdditionalCopyClause: opts.AdditionalCopyClause,
}

return dwh.PrepareTemporaryTable(tableData, tableConfig, opts.TempTableID, additionalSettings, false)
return dwh.PrepareTemporaryTable(
tableData,
tableConfig,
tableID,
types.AdditionalSettings{

Check failure on line 58 in clients/shared/append.go

View workflow job for this annotation

GitHub Actions / test

should convert opts (type github.com/artie-labs/transfer/lib/destination/types.AppendOpts) to github.com/artie-labs/transfer/lib/destination/types.AdditionalSettings instead of using struct literal (S1016)
AdditionalCopyClause: opts.AdditionalCopyClause,
},
false,
)
}
2 changes: 0 additions & 2 deletions clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ func (s *Store) Append(tableData *optimization.TableData) error {
}
}

tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
// TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing.
err = shared.Append(s, tableData, types.AppendOpts{
TempTableID: tableID,
AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`,
})
}
Expand Down
3 changes: 0 additions & 3 deletions lib/destination/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ type AdditionalSettings struct {
}

type AppendOpts struct {
// TempTableID - sometimes the destination requires 2 steps to append to the table (e.g. Redshift), so we'll create and load the data into a staging table
// Redshift then has a separate step after `shared.Append(...)` to merge the two tables together.
TempTableID TableIdentifier
AdditionalCopyClause string
}

Expand Down

0 comments on commit d7b0701

Please sign in to comment.