Skip to content

Commit

Permalink
Refactor Redshift Append (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 2, 2024
1 parent efec8fd commit e45f3f8
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 81 deletions.
12 changes: 0 additions & 12 deletions clients/bigquery/append.go

This file was deleted.

4 changes: 4 additions & 0 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Expand Down
3 changes: 1 addition & 2 deletions clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error {
}

func (s *Store) Append(tableData *optimization.TableData) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID})
return shared.Append(s, tableData, types.AdditionalSettings{})
}

// specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name.
Expand Down
13 changes: 13 additions & 0 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, s.config, types.MergeOpts{
UseMergeParts: true,
// We are adding SELECT DISTINCT here for the temporary table as an extra guardrail.
// Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise.
SubQueryDedupe: true,
})
}

func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) types.TableIdentifier {
return NewTableIdentifier(topicConfig.Schema, table)
}
Expand Down
27 changes: 14 additions & 13 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ import (
"github.com/artie-labs/transfer/lib/s3lib"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error {
// Redshift always creates a temporary table.
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}
}

fp, err := s.loadTemporaryTable(tableData, tempTableID)
Expand Down
34 changes: 0 additions & 34 deletions clients/redshift/writes.go

This file was deleted.

27 changes: 17 additions & 10 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,27 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AppendOpts) error {
func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AdditionalSettings) error {
if tableData.ShouldSkipUpdate() {
return nil
}

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableData)
if err != nil {
return fmt.Errorf("failed to get table config: %w", err)
}

// We don't care about srcKeysMissing because we don't drop columns when we append.
_, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode())
_, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols(),
tableConfig.Columns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: dwh,
Tc: tableConfig,
Expand All @@ -46,9 +51,11 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

additionalSettings := types.AdditionalSettings{
AdditionalCopyClause: opts.AdditionalCopyClause,
}

return dwh.PrepareTemporaryTable(tableData, tableConfig, opts.TempTableID, additionalSettings, false)
return dwh.PrepareTemporaryTable(
tableData,
tableConfig,
tableID,
opts,
false,
)
}
4 changes: 1 addition & 3 deletions clients/snowflake/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ func (s *Store) Append(tableData *optimization.TableData) error {
}
}

tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
// TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing.
err = shared.Append(s, tableData, types.AppendOpts{
TempTableID: tableID,
err = shared.Append(s, tableData, types.AdditionalSettings{
AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`,
})
}
Expand Down
7 changes: 0 additions & 7 deletions lib/destination/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ type AdditionalSettings struct {
AdditionalCopyClause string
}

type AppendOpts struct {
// TempTableID - sometimes the destination requires 2 steps to append to the table (e.g. Redshift), so we'll create and load the data into a staging table
// Redshift then has a separate step after `shared.Append(...)` to merge the two tables together.
TempTableID TableIdentifier
AdditionalCopyClause string
}

type TableIdentifier interface {
Table() string
WithTable(table string) TableIdentifier
Expand Down

0 comments on commit e45f3f8

Please sign in to comment.