Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 3, 2024
1 parent d51b538 commit 0b5ecdf
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
12 changes: 6 additions & 6 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func (d DatabricksDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableI
orderByCols = append(orderByCols, fmt.Sprintf("%s ASC", pk))
}

stagingTableQuery := fmt.Sprintf(`
CREATE TABLE %s AS
SELECT *
FROM %s
QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2
`, stagingTableID.FullyQualifiedName(), tableID.FullyQualifiedName(), strings.Join(primaryKeysEscaped, ", "), strings.Join(orderByCols, ", "))
stagingTableQuery := fmt.Sprintf(`CREATE TABLE %s AS SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2`,
stagingTableID.FullyQualifiedName(),
tableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(orderByCols, ", "),
)

var whereClauses []string
for _, primaryKeyEscaped := range primaryKeysEscaped {
Expand Down
47 changes: 35 additions & 12 deletions clients/databricks/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,40 @@ func TestDatabricksDialect_BuildDedupeQueries(t *testing.T) {
fakeStagingTableID := &mocks.FakeTableIdentifier{}
fakeStagingTableID.FullyQualifiedNameReturns("{STAGING}")

queries := dialect.BuildDedupeQueries(fakeTableID, fakeStagingTableID, []string{"id"}, true)
assert.Len(t, queries, 3)

expectedTempViewQuery := `
CREATE TABLE {STAGING} AS
SELECT *
FROM {TARGET}
QUALIFY ROW_NUMBER() OVER (PARTITION BY ` + dialect.QuoteIdentifier("id") + ` ORDER BY ` + dialect.QuoteIdentifier("id") + ` ASC, ` + dialect.QuoteIdentifier("__artie_updated_at") + ` ASC) = 2
`
{
// includeArtieUpdatedAt = true
queries := dialect.BuildDedupeQueries(fakeTableID, fakeStagingTableID, []string{"id"}, true)
assert.Len(t, queries, 3)
assert.Equal(t,
fmt.Sprintf("CREATE TABLE {STAGING} AS SELECT * FROM {TARGET} QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s ASC, %s ASC) = 2",
dialect.QuoteIdentifier("id"),
dialect.QuoteIdentifier("id"),
dialect.QuoteIdentifier(constants.UpdateColumnMarker),
),
queries[0])
assert.Equal(t,
fmt.Sprintf("DELETE FROM {TARGET} t1 WHERE EXISTS (SELECT * FROM {STAGING} t2 WHERE t1.%s = t2.%s)",
dialect.QuoteIdentifier("id"),
dialect.QuoteIdentifier("id"),
),
queries[1])
assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2])
}
{
// includeArtieUpdatedAt = false
queries := dialect.BuildDedupeQueries(fakeTableID, fakeStagingTableID, []string{"id"}, false)
assert.Len(t, queries, 3)
assert.Equal(t,
fmt.Sprintf("CREATE TABLE {STAGING} AS SELECT * FROM {TARGET} QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s ASC) = 2",
dialect.QuoteIdentifier("id"),
dialect.QuoteIdentifier("id")),
queries[0])
assert.Equal(t,
fmt.Sprintf("DELETE FROM {TARGET} t1 WHERE EXISTS (SELECT * FROM {STAGING} t2 WHERE t1.%s = t2.%s)",
dialect.QuoteIdentifier("id"),
dialect.QuoteIdentifier("id")),
queries[1])
assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2])
}

assert.Equal(t, expectedTempViewQuery, queries[0])
assert.Equal(t, "DELETE FROM {TARGET} t1 WHERE EXISTS (SELECT * FROM {STAGING} t2 WHERE t1."+dialect.QuoteIdentifier("id")+" = t2."+dialect.QuoteIdentifier("id")+")", queries[1])
assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2])
}

0 comments on commit 0b5ecdf

Please sign in to comment.