From e527a655bc74f6bfed8f177581b65c30e5ce132b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:15:56 -0700 Subject: [PATCH 1/6] Clean up. --- clients/redshift/staging.go | 28 +++++++++++++++------------- clients/redshift/writes.go | 15 +-------------- clients/shared/append.go | 11 ++++++++--- lib/destination/ddl/ddl.go | 1 + 4 files changed, 25 insertions(+), 30 deletions(-) diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 5c562034c..a38fa0864 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -10,25 +10,27 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" + "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/s3lib" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error { - // Redshift always creates a temporary table. - tempAlterTableArgs := ddl.AlterTableArgs{ - Dwh: s, - Tc: tableConfig, - TableID: tempTableID, - CreateTable: true, - TemporaryTable: true, - ColumnOp: constants.Add, - Mode: tableData.Mode(), - } +func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { + if createTempTable { + tempAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + TableID: tempTableID, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + Mode: tableData.Mode(), + } - if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { - return fmt.Errorf("failed to create temp table: %w", err) + if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + return fmt.Errorf("failed to create temp table: %w", err) + } } fp, err := s.loadTemporaryTable(tableData, tempTableID) diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go index fcaf08e67..edd130b95 100644 --- a/clients/redshift/writes.go +++ b/clients/redshift/writes.go @@ -1,8 +1,6 @@ package redshift import ( - "fmt" - "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" @@ -10,18 +8,7 @@ import ( func (s *Store) Append(tableData *optimization.TableData) error { tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - - // Redshift is slightly different, we'll load and create the temporary table via shared.Append - // Then, we'll invoke `ALTER TABLE target APPEND FROM staging` to combine the diffs. - temporaryTableID := shared.TempTableID(tableID, tableData.TempTableSuffix()) - if err := shared.Append(s, tableData, types.AppendOpts{TempTableID: temporaryTableID}); err != nil { - return err - } - - _, err := s.Exec( - fmt.Sprintf(`ALTER TABLE %s APPEND FROM %s;`, tableID.FullyQualifiedName(), temporaryTableID.FullyQualifiedName()), - ) - return err + return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) } func (s *Store) Merge(tableData *optimization.TableData) error { diff --git a/clients/shared/append.go b/clients/shared/append.go index 217758183..436e81976 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -23,9 +23,14 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op } // We don't care about srcKeysMissing because we don't drop columns when we append. - _, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), - tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, - tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode()) + _, targetKeysMissing := columns.Diff( + tableData.ReadOnlyInMemoryCols(), + tableConfig.Columns(), + tableData.TopicConfig().SoftDelete, + tableData.TopicConfig().IncludeArtieUpdatedAt, + tableData.TopicConfig().IncludeDatabaseUpdatedAt, + tableData.Mode(), + ) createAlterTableArgs := ddl.AlterTableArgs{ Dwh: dwh, diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 72c193811..609aa2787 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -20,6 +20,7 @@ import ( // It has a safety check to make sure the tableName contains the `constants.ArtiePrefix` key. // Temporary tables look like this: database.schema.tableName__artie__RANDOM_STRING(5)_expiryUnixTs func DropTemporaryTable(dwh destination.DataWarehouse, fqTableName string, shouldReturnError bool) error { + fmt.Println("dropping", fqTableName) if strings.Contains(strings.ToLower(fqTableName), constants.ArtiePrefix) { sqlCommand := fmt.Sprintf("DROP TABLE IF EXISTS %s", fqTableName) slog.Debug("Dropping temporary table", slog.String("sql", sqlCommand)) From e1732b222d456be47f61be667f4d48d3ded41929 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:22:27 -0700 Subject: [PATCH 2/6] CLean up. --- clients/redshift/staging.go | 1 - lib/destination/ddl/ddl.go | 1 - 2 files changed, 2 deletions(-) diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index a38fa0864..68d8311d2 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -10,7 +10,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/ddl" - "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/s3lib" diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 609aa2787..72c193811 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -20,7 +20,6 @@ import ( // It has a safety check to make sure the tableName contains the `constants.ArtiePrefix` key. // Temporary tables look like this: database.schema.tableName__artie__RANDOM_STRING(5)_expiryUnixTs func DropTemporaryTable(dwh destination.DataWarehouse, fqTableName string, shouldReturnError bool) error { - fmt.Println("dropping", fqTableName) if strings.Contains(strings.ToLower(fqTableName), constants.ArtiePrefix) { sqlCommand := fmt.Sprintf("DROP TABLE IF EXISTS %s", fqTableName) slog.Debug("Dropping temporary table", slog.String("sql", sqlCommand)) From d7b0701975cde6a443d8d9df0ef40d69d519f817 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:43:10 -0700 Subject: [PATCH 3/6] Clean up. --- clients/bigquery/append.go | 12 ------------ clients/bigquery/bigquery.go | 4 ++++ clients/mssql/store.go | 3 +-- clients/redshift/writes.go | 3 +-- clients/shared/append.go | 16 ++++++++++------ clients/snowflake/writes.go | 2 -- lib/destination/types/types.go | 3 --- 7 files changed, 16 insertions(+), 27 deletions(-) delete mode 100644 clients/bigquery/append.go diff --git a/clients/bigquery/append.go b/clients/bigquery/append.go deleted file mode 100644 index 8e4e221b0..000000000 --- a/clients/bigquery/append.go +++ /dev/null @@ -1,12 +0,0 @@ -package bigquery - -import ( - "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/optimization" -) - -func (s *Store) Append(tableData *optimization.TableData) error { - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) -} diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index f17802fc3..fddc25871 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -41,6 +41,10 @@ type Store struct { db.Store } +func (s *Store) Append(tableData *optimization.TableData) error { + return shared.Append(s, tableData, types.AppendOpts{}) +} + func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ diff --git a/clients/mssql/store.go b/clients/mssql/store.go index 73c6abb05..33577e96d 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -44,8 +44,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { } func (s *Store) Append(tableData *optimization.TableData) error { - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) + return shared.Append(s, tableData, types.AppendOpts{}) } // specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name. diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go index edd130b95..44e2c1f98 100644 --- a/clients/redshift/writes.go +++ b/clients/redshift/writes.go @@ -7,8 +7,7 @@ import ( ) func (s *Store) Append(tableData *optimization.TableData) error { - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) - return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID}) + return shared.Append(s, tableData, types.AppendOpts{}) } func (s *Store) Merge(tableData *optimization.TableData) error { diff --git a/clients/shared/append.go b/clients/shared/append.go index 436e81976..ec018736e 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -16,7 +16,6 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return nil } - tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) tableConfig, err := dwh.GetTableConfig(tableData) if err != nil { return fmt.Errorf("failed to get table config: %w", err) @@ -32,6 +31,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op tableData.Mode(), ) + tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) createAlterTableArgs := ddl.AlterTableArgs{ Dwh: dwh, Tc: tableConfig, @@ -51,9 +51,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return fmt.Errorf("failed to merge columns from destination: %w", err) } - additionalSettings := types.AdditionalSettings{ - AdditionalCopyClause: opts.AdditionalCopyClause, - } - - return dwh.PrepareTemporaryTable(tableData, tableConfig, opts.TempTableID, additionalSettings, false) + return dwh.PrepareTemporaryTable( + tableData, + tableConfig, + tableID, + types.AdditionalSettings{ + AdditionalCopyClause: opts.AdditionalCopyClause, + }, + false, + ) } diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index f5f48f0b9..1b2420e6e 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -25,10 +25,8 @@ func (s *Store) Append(tableData *optimization.TableData) error { } } - tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. err = shared.Append(s, tableData, types.AppendOpts{ - TempTableID: tableID, AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, }) } diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index 79f5df7a8..f59bccb38 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -44,9 +44,6 @@ type AdditionalSettings struct { } type AppendOpts struct { - // TempTableID - sometimes the destination requires 2 steps to append to the table (e.g. Redshift), so we'll create and load the data into a staging table - // Redshift then has a separate step after `shared.Append(...)` to merge the two tables together. - TempTableID TableIdentifier AdditionalCopyClause string } From 28b565f92f3265f5be305757899106e15db4e810 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:47:22 -0700 Subject: [PATCH 4/6] Lint. --- clients/shared/append.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/clients/shared/append.go b/clients/shared/append.go index ec018736e..16b9fbba7 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -51,13 +51,15 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return fmt.Errorf("failed to merge columns from destination: %w", err) } + additionalSettings := types.AdditionalSettings{ + AdditionalCopyClause: opts.AdditionalCopyClause, + } + return dwh.PrepareTemporaryTable( tableData, tableConfig, tableID, - types.AdditionalSettings{ - AdditionalCopyClause: opts.AdditionalCopyClause, - }, + additionalSettings, false, ) } From 620638a0d2d9bc5e973367ee6b9afb87b47c5d01 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:49:45 -0700 Subject: [PATCH 5/6] Lint --- clients/bigquery/bigquery.go | 2 +- clients/mssql/store.go | 2 +- clients/redshift/writes.go | 2 +- clients/shared/append.go | 8 ++------ clients/snowflake/writes.go | 2 +- lib/destination/types/types.go | 4 ---- 6 files changed, 6 insertions(+), 14 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index fddc25871..5d6fec060 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -42,7 +42,7 @@ type Store struct { } func (s *Store) Append(tableData *optimization.TableData) error { - return shared.Append(s, tableData, types.AppendOpts{}) + return shared.Append(s, tableData, types.AdditionalSettings{}) } func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { diff --git a/clients/mssql/store.go b/clients/mssql/store.go index 33577e96d..81265d188 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -44,7 +44,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { } func (s *Store) Append(tableData *optimization.TableData) error { - return shared.Append(s, tableData, types.AppendOpts{}) + return shared.Append(s, tableData, types.AdditionalSettings{}) } // specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name. diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go index 44e2c1f98..c910f6134 100644 --- a/clients/redshift/writes.go +++ b/clients/redshift/writes.go @@ -7,7 +7,7 @@ import ( ) func (s *Store) Append(tableData *optimization.TableData) error { - return shared.Append(s, tableData, types.AppendOpts{}) + return shared.Append(s, tableData, types.AdditionalSettings{}) } func (s *Store) Merge(tableData *optimization.TableData) error { diff --git a/clients/shared/append.go b/clients/shared/append.go index 16b9fbba7..d3d9bfecb 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -11,7 +11,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AppendOpts) error { +func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AdditionalSettings) error { if tableData.ShouldSkipUpdate() { return nil } @@ -51,15 +51,11 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op return fmt.Errorf("failed to merge columns from destination: %w", err) } - additionalSettings := types.AdditionalSettings{ - AdditionalCopyClause: opts.AdditionalCopyClause, - } - return dwh.PrepareTemporaryTable( tableData, tableConfig, tableID, - additionalSettings, + opts, false, ) } diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index 1b2420e6e..b5ca06e98 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -26,7 +26,7 @@ func (s *Store) Append(tableData *optimization.TableData) error { } // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. - err = shared.Append(s, tableData, types.AppendOpts{ + err = shared.Append(s, tableData, types.AdditionalSettings{ AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, }) } diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index f59bccb38..9b79b62d8 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -43,10 +43,6 @@ type AdditionalSettings struct { AdditionalCopyClause string } -type AppendOpts struct { - AdditionalCopyClause string -} - type TableIdentifier interface { Table() string WithTable(table string) TableIdentifier From e3c4c64608ba98f3968c29ea41d3866906280080 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 1 May 2024 18:50:37 -0700 Subject: [PATCH 6/6] Moving writes into redshift.go. --- clients/redshift/redshift.go | 13 +++++++++++++ clients/redshift/writes.go | 20 -------------------- 2 files changed, 13 insertions(+), 20 deletions(-) delete mode 100644 clients/redshift/writes.go diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 65720e005..4626f6cc3 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -30,6 +30,19 @@ type Store struct { db.Store } +func (s *Store) Append(tableData *optimization.TableData) error { + return shared.Append(s, tableData, types.AdditionalSettings{}) +} + +func (s *Store) Merge(tableData *optimization.TableData) error { + return shared.Merge(s, tableData, s.config, types.MergeOpts{ + UseMergeParts: true, + // We are adding SELECT DISTINCT here for the temporary table as an extra guardrail. + // Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise. + SubQueryDedupe: true, + }) +} + func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) types.TableIdentifier { return NewTableIdentifier(topicConfig.Schema, table) } diff --git a/clients/redshift/writes.go b/clients/redshift/writes.go deleted file mode 100644 index c910f6134..000000000 --- a/clients/redshift/writes.go +++ /dev/null @@ -1,20 +0,0 @@ -package redshift - -import ( - "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/optimization" -) - -func (s *Store) Append(tableData *optimization.TableData) error { - return shared.Append(s, tableData, types.AdditionalSettings{}) -} - -func (s *Store) Merge(tableData *optimization.TableData) error { - return shared.Merge(s, tableData, s.config, types.MergeOpts{ - UseMergeParts: true, - // We are adding SELECT DISTINCT here for the temporary table as an extra guardrail. - // Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise. - SubQueryDedupe: true, - }) -}