Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 3, 2024
1 parent e768417 commit 632a0c1
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
38 changes: 36 additions & 2 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,42 @@ func (DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, prim
panic("not implemented")
}

func (DatabricksDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ bool) []string {
panic("not implemented")
func (d DatabricksDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, d)

orderColsToIterate := primaryKeysEscaped
if includeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, d.QuoteIdentifier(constants.UpdateColumnMarker))
}

var orderByCols []string
for _, pk := range orderColsToIterate {
orderByCols = append(orderByCols, fmt.Sprintf("%s ASC", pk))
}

var parts []string
parts = append(parts, fmt.Sprintf("CREATE OR REPLACE TEMP VIEW %s AS SELECT * FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) as row_num FROM %s) WHERE row_num = 2",
stagingTableID.FullyQualifiedName(),
strings.Join(primaryKeysEscaped, ", "),
strings.Join(orderByCols, ", "),
tableID.FullyQualifiedName(),
))

var whereClauses []string
for _, primaryKeyEscaped := range primaryKeysEscaped {
whereClauses = append(whereClauses, fmt.Sprintf("t1.%s = t2.%s", primaryKeyEscaped, primaryKeyEscaped))
}

parts = append(parts,
fmt.Sprintf("DELETE FROM %s t1 USING %s t2 WHERE %s",
tableID.FullyQualifiedName(),
stagingTableID.FullyQualifiedName(),
strings.Join(whereClauses, " AND "),
),
)

parts = append(parts, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName()))
return parts
}

func (d DatabricksDialect) BuildMergeQueries(
Expand Down
6 changes: 5 additions & 1 deletion clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"path/filepath"

"github.com/artie-labs/transfer/lib/destination"

_ "github.com/databricks/databricks-sql-go"
"github.com/databricks/databricks-sql-go/driverctx"

Expand Down Expand Up @@ -53,7 +55,9 @@ func (s Store) Dialect() sql.Dialect {
}

func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
panic("not implemented")
stagingTableID := shared.TempTableID(tableID)
dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt)
return destination.ExecStatements(s, dedupeQueries)
}

func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
Expand Down

0 comments on commit 632a0c1

Please sign in to comment.