From 0b5ecdfe63ce5d09a943aa6e7ae3b6efc54414ce Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 3 Oct 2024 16:06:32 -0700 Subject: [PATCH] Clean up. --- clients/databricks/dialect/dialect.go | 12 +++--- clients/databricks/dialect/dialect_test.go | 47 ++++++++++++++++------ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index c43c6d534..52214977a 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -62,12 +62,12 @@ func (d DatabricksDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableI orderByCols = append(orderByCols, fmt.Sprintf("%s ASC", pk)) } - stagingTableQuery := fmt.Sprintf(` - CREATE TABLE %s AS - SELECT * - FROM %s - QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2 - `, stagingTableID.FullyQualifiedName(), tableID.FullyQualifiedName(), strings.Join(primaryKeysEscaped, ", "), strings.Join(orderByCols, ", ")) + stagingTableQuery := fmt.Sprintf(`CREATE TABLE %s AS SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 2`, + stagingTableID.FullyQualifiedName(), + tableID.FullyQualifiedName(), + strings.Join(primaryKeysEscaped, ", "), + strings.Join(orderByCols, ", "), + ) var whereClauses []string for _, primaryKeyEscaped := range primaryKeysEscaped { diff --git a/clients/databricks/dialect/dialect_test.go b/clients/databricks/dialect/dialect_test.go index 7a196b70e..35b6b078a 100644 --- a/clients/databricks/dialect/dialect_test.go +++ b/clients/databricks/dialect/dialect_test.go @@ -87,17 +87,40 @@ func TestDatabricksDialect_BuildDedupeQueries(t *testing.T) { fakeStagingTableID := &mocks.FakeTableIdentifier{} fakeStagingTableID.FullyQualifiedNameReturns("{STAGING}") - queries := dialect.BuildDedupeQueries(fakeTableID, fakeStagingTableID, []string{"id"}, true) - assert.Len(t, queries, 3) - - expectedTempViewQuery := ` - CREATE TABLE {STAGING} AS - SELECT * - FROM {TARGET} - QUALIFY ROW_NUMBER() OVER (PARTITION BY ` + dialect.QuoteIdentifier("id") + ` ORDER BY ` + dialect.QuoteIdentifier("id") + ` ASC, ` + dialect.QuoteIdentifier("__artie_updated_at") + ` ASC) = 2 - ` + { + // includeArtieUpdatedAt = true + queries := dialect.BuildDedupeQueries(fakeTableID, fakeStagingTableID, []string{"id"}, true) + assert.Len(t, queries, 3) + assert.Equal(t, + fmt.Sprintf("CREATE TABLE {STAGING} AS SELECT * FROM {TARGET} QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s ASC, %s ASC) = 2", + dialect.QuoteIdentifier("id"), + dialect.QuoteIdentifier("id"), + dialect.QuoteIdentifier(constants.UpdateColumnMarker), + ), + queries[0]) + assert.Equal(t, + fmt.Sprintf("DELETE FROM {TARGET} t1 WHERE EXISTS (SELECT * FROM {STAGING} t2 WHERE t1.%s = t2.%s)", + dialect.QuoteIdentifier("id"), + dialect.QuoteIdentifier("id"), + ), + queries[1]) + assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2]) + } + { + // includeArtieUpdatedAt = false + queries := dialect.BuildDedupeQueries(fakeTableID, fakeStagingTableID, []string{"id"}, false) + assert.Len(t, queries, 3) + assert.Equal(t, + fmt.Sprintf("CREATE TABLE {STAGING} AS SELECT * FROM {TARGET} QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s ASC) = 2", + dialect.QuoteIdentifier("id"), + dialect.QuoteIdentifier("id")), + queries[0]) + assert.Equal(t, + fmt.Sprintf("DELETE FROM {TARGET} t1 WHERE EXISTS (SELECT * FROM {STAGING} t2 WHERE t1.%s = t2.%s)", + dialect.QuoteIdentifier("id"), + dialect.QuoteIdentifier("id")), + queries[1]) + assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2]) + } - assert.Equal(t, expectedTempViewQuery, queries[0]) - assert.Equal(t, "DELETE FROM {TARGET} t1 WHERE EXISTS (SELECT * FROM {STAGING} t2 WHERE t1."+dialect.QuoteIdentifier("id")+" = t2."+dialect.QuoteIdentifier("id")+")", queries[1]) - assert.Equal(t, "INSERT INTO {TARGET} SELECT * FROM {STAGING}", queries[2]) }