diff --git a/clients/bigquery/append.go b/clients/bigquery/append.go deleted file mode 100644 index 8e4e221b0..000000000 --- a/clients/bigquery/append.go +++ /dev/null @@ -1,12 +0,0 @@ -package bigquery - -import ( - "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/optimization" -) - -func (s *Store) Append(tableData *optimization.TableData) error { - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) -} diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index f17802fc3..5d6fec060 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -41,6 +41,10 @@ type Store struct { db.Store } +func (s *Store) Append(tableData *optimization.TableData) error { + return shared.Append(s, tableData, types.AdditionalSettings{}) +} + func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ diff --git a/clients/mssql/store.go b/clients/mssql/store.go index 73c6abb05..81265d188 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -44,8 +44,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { } func (s *Store) Append(tableData *optimization.TableData) error { - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) + return shared.Append(s, tableData, types.AdditionalSettings{}) } // specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name. diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 65720e005..4626f6cc3 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -30,6 +30,19 @@ type Store struct { db.Store } +func (s *Store) Append(tableData *optimization.TableData) error { + return shared.Append(s, tableData, types.AdditionalSettings{}) +} + +func (s *Store) Merge(tableData *optimization.TableData) error { + return shared.Merge(s, tableData, s.config, types.MergeOpts{ + UseMergeParts: true, + // We are adding SELECT DISTINCT here for the temporary table as an extra guardrail. + // Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise. + SubQueryDedupe: true, + }) +} + func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) types.TableIdentifier { return NewTableIdentifier(topicConfig.Schema, table) } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 5c562034c..68d8311d2 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -15,20 +15,21 @@ import ( "github.com/artie-labs/transfer/lib/s3lib" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error { - // Redshift always creates a temporary table. - tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - Mode: tableData.Mode(), - } +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { + if createTempTable { + tempAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), + } - if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { - return fmt.Errorf("failed to create temp table: %w", err) + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + return fmt.Errorf("failed to create temp table: %w", err) + } } fp, err := s.loadTemporaryTable(tableData, tempTableID) diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go deleted file mode 100644 index fcaf08e67..000000000 --- a/clients/redshift/writes.go +++ /dev/null @@ -1,34 +0,0 @@ -package redshift - -import ( - "fmt" - - "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/optimization" -) - -func (s *Store) Append(tableData *optimization.TableData) error { - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - - // Redshift is slightly different, we'll load and create the temporary table via shared.Append - // Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs. - temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix()) - if err := shared.Append(s, tableData, types.AppendOpts{TempTableID: temporaryTableID}); err != nil { - return err - } - - _, err := s.Exec( - fmt.Sprintf(`ALTER TABLE %s APPEND FROM %s;`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()), - ) - return err -} - -func (s *Store) Merge(tableData *optimization.TableData) error { - return shared.Merge(s, tableData, s.config, types.MergeOpts{ - UseMergeParts: true, - // We are adding SELECT DISTINCT here for the temporary table as an extra guardrail. - // Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise. - SubQueryDedupe: true, - }) -} diff --git a/clients/shared/append.go b/clients/shared/append.go index 217758183..d3d9bfecb 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -11,22 +11,27 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AppendOpts) error { +func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AdditionalSettings) error { if tableData.ShouldSkipUpdate() { return nil } - tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) tableConfig, err := dwh.GetTableConfig(tableData) if err != nil { return fmt.Errorf("failed to get table config: %w", err) } // We don't care about srcKeysMissing because we don't drop columns when we append. - _, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), - tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, - tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode()) + _, targetKeysMissing := columns.Diff( + tableData.ReadOnlyInMemoryCols(), + tableConfig.Columns(), + tableData.TopicConfig().SoftDelete, + tableData.TopicConfig().IncludeArtieUpdatedAt, + tableData.TopicConfig().IncludeDatabaseUpdatedAt, + tableData.Mode(), + ) + tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) createAlterTableArgs := ddl.AlterTableArgs{ Dwh: dwh, Tc: tableConfig, @@ -46,9 +51,11 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return fmt.Errorf("failed to merge columns from destination: %w", err) } - additionalSettings := types.AdditionalSettings{ - AdditionalCopyClause: opts.AdditionalCopyClause, - } - - return dwh.PrepareTemporaryTable(tableData, tableConfig, opts.TempTableID, additionalSettings, false) + return dwh.PrepareTemporaryTable( + tableData, + tableConfig, + tableID, + opts, + false, + ) } diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index f5f48f0b9..b5ca06e98 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -25,10 +25,8 @@ func (s *Store) Append(tableData *optimization.TableData) error { } } - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. - err = shared.Append(s, tableData, types.AppendOpts{ - TempTableID: tableID, + err = shared.Append(s, tableData, types.AdditionalSettings{ AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, }) } diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index 79f5df7a8..9b79b62d8 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -43,13 +43,6 @@ type AdditionalSettings struct { AdditionalCopyClause string } -type AppendOpts struct { - // TempTableID - sometimes the destination requires 2 steps to append to the table (e.g. Redshift), so we'll create and load the data into a staging table - // Redshift then has a separate step after `shared.Append(...)` to merge the two tables together. - TempTableID TableIdentifier - AdditionalCopyClause string -} - type TableIdentifier interface { Table() string WithTable(table string) TableIdentifier