diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 4626f6cc3..06016f28a 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -36,7 +36,6 @@ func (s *Store) Append(tableData *optimization.TableData) error { func (s *Store) Merge(tableData *optimization.TableData) error { return shared.Merge(s, tableData, s.config, types.MergeOpts{ - UseMergeParts: true, // We are adding SELECT DISTINCT here for the temporary table as an extra guardrail. // Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise. SubQueryDedupe: true, diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 1651043b1..6486222ab 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -145,8 +145,8 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg mergeArg.AdditionalEqualityStrings = opts.AdditionalEqualityStrings } - if opts.UseMergeParts { - mergeParts, err := mergeArg.GetParts() + if dwh.Label() == constants.Redshift { + mergeParts, err := mergeArg.GetRedshiftStatements() if err != nil { return fmt.Errorf("failed to generate merge statement: %w", err) } diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go index f0408cef4..e5f4093bf 100644 --- a/lib/destination/dml/merge.go +++ b/lib/destination/dml/merge.go @@ -125,7 +125,7 @@ func (m *MergeArgument) buildRedshiftDeleteQuery() string { ) } -func (m *MergeArgument) GetParts() ([]string, error) { +func (m *MergeArgument) GetRedshiftStatements() ([]string, error) { if err := m.Valid(); err != nil { return nil, err } diff --git a/lib/destination/dml/merge_parts_test.go b/lib/destination/dml/merge_redshift_test.go similarity index 93% rename from lib/destination/dml/merge_parts_test.go rename to lib/destination/dml/merge_redshift_test.go index 8b392cc66..8cf95cecf 100644 --- a/lib/destination/dml/merge_parts_test.go +++ b/lib/destination/dml/merge_redshift_test.go @@ -13,12 +13,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMergeStatementPartsValidation(t *testing.T) { +func TestMergeArgument_GetRedshiftStatements_Validation(t *testing.T) { for _, arg := range []*MergeArgument{ {Dialect: sql.SnowflakeDialect{}}, {Dialect: sql.BigQueryDialect{}}, } { - parts, err := arg.GetParts() + parts, err := arg.GetRedshiftStatements() assert.ErrorContains(t, err, "merge argument does not contain primary keys") assert.Nil(t, parts) } @@ -60,7 +60,7 @@ func getBasicColumnsForTest(compositeKey bool) result { } } -func TestMergeStatementParts_SkipDelete(t *testing.T) { +func TestMergeArgument_GetRedshiftStatements_SkipDelete(t *testing.T) { // Biggest difference with this test are: // 1. We are not saving `__artie_deleted` column // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) @@ -76,7 +76,7 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) { ContainsHardDeletes: ptr.ToBool(false), } - parts, err := mergeArg.GetParts() + parts, err := mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 2, len(parts)) @@ -89,7 +89,7 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) { parts[1]) } -func TestMergeStatementPartsSoftDelete(t *testing.T) { +func TestMergeArgument_GetRedshiftStatements_SoftDelete(t *testing.T) { fqTableName := "public.tableName" tempTableName := "public.tableName__temp" res := getBasicColumnsForTest(false) @@ -103,7 +103,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) { ContainsHardDeletes: ptr.ToBool(false), } - parts, err := mergeArg.GetParts() + parts, err := mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 2, len(parts)) @@ -115,7 +115,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) { parts[1]) mergeArg.IdempotentKey = "created_at" - parts, err = mergeArg.GetParts() + parts, err = mergeArg.GetRedshiftStatements() assert.NoError(t, err) // Parts[0] for insertion should be identical @@ -128,7 +128,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) { parts[1]) } -func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { +func TestMergeArgument_GetRedshiftStatements_SoftDeleteComposite(t *testing.T) { fqTableName := "public.tableName" tempTableName := "public.tableName__temp" res := getBasicColumnsForTest(true) @@ -142,7 +142,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { ContainsHardDeletes: ptr.ToBool(false), } - parts, err := mergeArg.GetParts() + parts, err := mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 2, len(parts)) @@ -154,7 +154,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { parts[1]) mergeArg.IdempotentKey = "created_at" - parts, err = mergeArg.GetParts() + parts, err = mergeArg.GetRedshiftStatements() assert.NoError(t, err) // Parts[0] for insertion should be identical @@ -167,7 +167,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { parts[1]) } -func TestMergeStatementParts(t *testing.T) { +func TestMergeArgument_GetRedshiftStatements(t *testing.T) { // Biggest difference with this test are: // 1. We are not saving `__artie_deleted` column // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) @@ -183,7 +183,7 @@ func TestMergeStatementParts(t *testing.T) { ContainsHardDeletes: ptr.ToBool(true), } - parts, err := mergeArg.GetParts() + parts, err := mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 3, len(parts)) @@ -209,7 +209,7 @@ func TestMergeStatementParts(t *testing.T) { ContainsHardDeletes: ptr.ToBool(true), } - parts, err = mergeArg.GetParts() + parts, err = mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 3, len(parts)) @@ -226,7 +226,7 @@ func TestMergeStatementParts(t *testing.T) { parts[2]) } -func TestMergeStatementPartsCompositeKey(t *testing.T) { +func TestMergeArgument_GetRedshiftStatements_CompositeKey(t *testing.T) { fqTableName := "public.tableName" tempTableName := "public.tableName__temp" res := getBasicColumnsForTest(true) @@ -239,7 +239,7 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) { ContainsHardDeletes: ptr.ToBool(true), } - parts, err := mergeArg.GetParts() + parts, err := mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 3, len(parts)) @@ -265,7 +265,7 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) { IdempotentKey: "created_at", } - parts, err = mergeArg.GetParts() + parts, err = mergeArg.GetRedshiftStatements() assert.NoError(t, err) assert.Equal(t, 3, len(parts)) diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index 9b79b62d8..3fa432c93 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -33,7 +33,6 @@ func (d *DwhToTablesConfigMap) AddTableToConfig(tableID TableIdentifier, config } type MergeOpts struct { - UseMergeParts bool SubQueryDedupe bool AdditionalEqualityStrings []string RetryColBackfill bool