Skip to content

Commit

Permalink
[dml] Rename MergeArgument.GetParts (#570)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored May 4, 2024
1 parent 226645e commit 8363028
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 21 deletions.
1 change: 0 additions & 1 deletion clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (s *Store) Append(tableData *optimization.TableData) error {

func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, s.config, types.MergeOpts{
UseMergeParts: true,
// We are adding SELECT DISTINCT here for the temporary table as an extra guardrail.
// Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise.
SubQueryDedupe: true,
Expand Down
4 changes: 2 additions & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
mergeArg.AdditionalEqualityStrings = opts.AdditionalEqualityStrings
}

if opts.UseMergeParts {
mergeParts, err := mergeArg.GetParts()
if dwh.Label() == constants.Redshift {
mergeParts, err := mergeArg.GetRedshiftStatements()
if err != nil {
return fmt.Errorf("failed to generate merge statement: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/dml/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (m *MergeArgument) buildRedshiftDeleteQuery() string {
)
}

func (m *MergeArgument) GetParts() ([]string, error) {
func (m *MergeArgument) GetRedshiftStatements() ([]string, error) {
if err := m.Valid(); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
"github.com/stretchr/testify/assert"
)

func TestMergeStatementPartsValidation(t *testing.T) {
func TestMergeArgument_GetRedshiftStatements_Validation(t *testing.T) {
for _, arg := range []*MergeArgument{
{Dialect: sql.SnowflakeDialect{}},
{Dialect: sql.BigQueryDialect{}},
} {
parts, err := arg.GetParts()
parts, err := arg.GetRedshiftStatements()
assert.ErrorContains(t, err, "merge argument does not contain primary keys")
assert.Nil(t, parts)
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func getBasicColumnsForTest(compositeKey bool) result {
}
}

func TestMergeStatementParts_SkipDelete(t *testing.T) {
func TestMergeArgument_GetRedshiftStatements_SkipDelete(t *testing.T) {
// Biggest difference with this test are:
// 1. We are not saving `__artie_deleted` column
// 2. There are 3 SQL queries (INSERT, UPDATE and DELETE)
Expand All @@ -76,7 +76,7 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) {
ContainsHardDeletes: ptr.ToBool(false),
}

parts, err := mergeArg.GetParts()
parts, err := mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 2, len(parts))

Expand All @@ -89,7 +89,7 @@ func TestMergeStatementParts_SkipDelete(t *testing.T) {
parts[1])
}

func TestMergeStatementPartsSoftDelete(t *testing.T) {
func TestMergeArgument_GetRedshiftStatements_SoftDelete(t *testing.T) {
fqTableName := "public.tableName"
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(false)
Expand All @@ -103,7 +103,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) {
ContainsHardDeletes: ptr.ToBool(false),
}

parts, err := mergeArg.GetParts()
parts, err := mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 2, len(parts))

Expand All @@ -115,7 +115,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) {
parts[1])

mergeArg.IdempotentKey = "created_at"
parts, err = mergeArg.GetParts()
parts, err = mergeArg.GetRedshiftStatements()
assert.NoError(t, err)

// Parts[0] for insertion should be identical
Expand All @@ -128,7 +128,7 @@ func TestMergeStatementPartsSoftDelete(t *testing.T) {
parts[1])
}

func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) {
func TestMergeArgument_GetRedshiftStatements_SoftDeleteComposite(t *testing.T) {
fqTableName := "public.tableName"
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(true)
Expand All @@ -142,7 +142,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) {
ContainsHardDeletes: ptr.ToBool(false),
}

parts, err := mergeArg.GetParts()
parts, err := mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 2, len(parts))

Expand All @@ -154,7 +154,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) {
parts[1])

mergeArg.IdempotentKey = "created_at"
parts, err = mergeArg.GetParts()
parts, err = mergeArg.GetRedshiftStatements()
assert.NoError(t, err)

// Parts[0] for insertion should be identical
Expand All @@ -167,7 +167,7 @@ func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) {
parts[1])
}

func TestMergeStatementParts(t *testing.T) {
func TestMergeArgument_GetRedshiftStatements(t *testing.T) {
// Biggest difference with this test are:
// 1. We are not saving `__artie_deleted` column
// 2. There are 3 SQL queries (INSERT, UPDATE and DELETE)
Expand All @@ -183,7 +183,7 @@ func TestMergeStatementParts(t *testing.T) {
ContainsHardDeletes: ptr.ToBool(true),
}

parts, err := mergeArg.GetParts()
parts, err := mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

Expand All @@ -209,7 +209,7 @@ func TestMergeStatementParts(t *testing.T) {
ContainsHardDeletes: ptr.ToBool(true),
}

parts, err = mergeArg.GetParts()
parts, err = mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

Expand All @@ -226,7 +226,7 @@ func TestMergeStatementParts(t *testing.T) {
parts[2])
}

func TestMergeStatementPartsCompositeKey(t *testing.T) {
func TestMergeArgument_GetRedshiftStatements_CompositeKey(t *testing.T) {
fqTableName := "public.tableName"
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(true)
Expand All @@ -239,7 +239,7 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) {
ContainsHardDeletes: ptr.ToBool(true),
}

parts, err := mergeArg.GetParts()
parts, err := mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

Expand All @@ -265,7 +265,7 @@ func TestMergeStatementPartsCompositeKey(t *testing.T) {
IdempotentKey: "created_at",
}

parts, err = mergeArg.GetParts()
parts, err = mergeArg.GetRedshiftStatements()
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

Expand Down
1 change: 0 additions & 1 deletion lib/destination/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (d *DwhToTablesConfigMap) AddTableToConfig(tableID TableIdentifier, config
}

type MergeOpts struct {
UseMergeParts bool
SubQueryDedupe bool
AdditionalEqualityStrings []string
RetryColBackfill bool
Expand Down

0 comments on commit 8363028

Please sign in to comment.