Skip to content

Commit

Permalink
Implement Incremental Merge Strategy for Clickhouse (#6069)
Browse files Browse the repository at this point in the history
* implement IncrementalReplaceStrategy

* apply replace incremental strategy for clickhouse

* cosmetic change

* cosmetic change

* fix fmt issues

* defer drop table

* Simplified and added support of distributed table

* Simplified and added support of distributed table

* Refactor clickhouse driver partition handling

* Validate incremental strategy for partition key presence

* Refactor table operations into helper methods

* Improve partition handling and table creation checks

* Fix case sensitivity in partition key validation.

* Fix SQL query string formatting in ClickHouse driver

* Use deterministic names for temporary tables

* Update DuckDB driver test to use new storage initialization

* Add partition overwrite incremental strategy

---------

Co-authored-by: e.sevastyanov <[email protected]>
  • Loading branch information
rohithreddykota and esevastyanov authored Dec 5, 2024
1 parent 364d21b commit d3c4c5f
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 7 deletions.
2 changes: 1 addition & 1 deletion runtime/drivers/clickhouse/model_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *ModelOutputProperties) Validate(opts *drivers.ModelExecuteOptions) erro
}

switch p.IncrementalStrategy {
case drivers.IncrementalStrategyUnspecified, drivers.IncrementalStrategyAppend:
case drivers.IncrementalStrategyUnspecified, drivers.IncrementalStrategyAppend, drivers.IncrementalStrategyPartitionOverwrite:
default:
return fmt.Errorf("invalid incremental strategy %q", p.IncrementalStrategy)
}
Expand Down
127 changes: 125 additions & 2 deletions runtime/drivers/clickhouse/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clickhouse

import (
"context"
"crypto/md5"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -251,7 +252,80 @@ func (c *connection) InsertTableAsSelect(ctx context.Context, name, sql string,
})
}

// merge strategy is also not supported for clickhouse
if strategy == drivers.IncrementalStrategyPartitionOverwrite {
_, onCluster, err := informationSchema{c: c}.entityType(ctx, "", name)
if err != nil {
return err
}
onClusterClause := ""
if onCluster {
onClusterClause = "ON CLUSTER " + safeSQLName(c.config.Cluster)
}
// Get the engine info of the given table
engine, err := c.getTableEngine(ctx, name)
if err != nil {
return err
}
// Distributed table cannot be altered directly, so we need to alter the local table
if engine == "Distributed" {
name = localTableName(name)
}
// create temp table with the same schema using a deterministic name
tempName := fmt.Sprintf("__rill_temp_%s_%x", name, md5.Sum([]byte(sql)))
err = c.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("CREATE OR REPLACE TABLE %s %s AS %s", safeSQLName(tempName), onClusterClause, name),
Priority: 1,
})
if err != nil {
return err
}
// clean up the temp table
defer func() {
var cancel context.CancelFunc

// If the original context is cancelled, create a new context for cleanup
if ctx.Err() != nil {
ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
} else {
cancel = func() {}
}
defer cancel()

err = c.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("DROP TABLE %s %s", safeSQLName(tempName), onClusterClause),
Priority: 1,
})
if err != nil {
c.logger.Warn("clickhouse: failed to drop temp table", zap.String("name", tempName), zap.Error(err))
}
}()
// insert into temp table
err = c.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("INSERT INTO %s %s", safeSQLName(tempName), sql),
Priority: 1,
})
if err != nil {
return err
}
// list partitions from the temp table
partitions, err := c.getTablePartitions(ctx, tempName)
if err != nil {
return err
}
// iterate over partitions and replace them in the main table
for _, part := range partitions {
// alter the main table to replace the partition
err = c.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("ALTER TABLE %s %s REPLACE PARTITION %s FROM %s", safeSQLName(name), onClusterClause, part, safeSQLName(tempName)),
Priority: 1,
})
if err != nil {
return err
}
}
return nil
}

return fmt.Errorf("incremental insert strategy %q not supported", strategy)
}

Expand Down Expand Up @@ -477,7 +551,15 @@ func (c *connection) createTable(ctx context.Context, name, sql string, outputPr
} else {
fmt.Fprintf(&create, " %s ", outputProps.Columns)
}
create.WriteString(outputProps.tblConfig())

tableConfig := outputProps.tblConfig()
create.WriteString(tableConfig)

// validate incremental strategy
if outputProps.IncrementalStrategy == drivers.IncrementalStrategyPartitionOverwrite &&
!strings.Contains(strings.ToUpper(tableConfig), "PARTITION BY") {
return fmt.Errorf("clickhouse: incremental strategy partition_overwrite requires a partition key")
}

// create table
err := c.Exec(ctx, &drivers.Statement{Query: create.String(), Priority: 100})
Expand Down Expand Up @@ -659,6 +741,47 @@ func (c *connection) acquireConn(ctx context.Context) (*sqlx.Conn, func() error,
return conn, release, nil
}

func (c *connection) getTableEngine(ctx context.Context, name string) (string, error) {
var engine string
res, err := c.Execute(ctx, &drivers.Statement{
Query: "SELECT engine FROM system.tables WHERE database = currentDatabase() AND name = ?",
Args: []any{name},
Priority: 1,
})
if err != nil {
return "", err
}
defer res.Close()
if res.Next() {
if err := res.Scan(&engine); err != nil {
return "", err
}
}
return engine, nil
}

func (c *connection) getTablePartitions(ctx context.Context, name string) ([]string, error) {
res, err := c.Execute(ctx, &drivers.Statement{
Query: "SELECT DISTINCT partition FROM system.parts WHERE table = ?",
Args: []any{name},
Priority: 1,
})
if err != nil {
return nil, err
}
defer res.Close()
// collect partitions
var partitions []string
for res.Next() {
var part string
if err := res.Scan(&part); err != nil {
return nil, err
}
partitions = append(partitions, part)
}
return partitions, nil
}

func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) {
if r == nil {
return nil, nil
Expand Down
132 changes: 132 additions & 0 deletions runtime/drivers/clickhouse/olap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clickhouse_test
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"testing"

"github.com/rilldata/rill/runtime/drivers"
Expand Down Expand Up @@ -37,6 +38,8 @@ func testClickhouseSingleHost(t *testing.T, dsn string) {
})
t.Run("RenameTable", func(t *testing.T) { testRenameTable(t, olap) })
t.Run("CreateTableAsSelect", func(t *testing.T) { testCreateTableAsSelect(t, olap) })
t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) })
t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) })
t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) })

}
Expand All @@ -56,6 +59,8 @@ func testClickhouseCluster(t *testing.T, dsn, cluster string) {
})
t.Run("RenameTable", func(t *testing.T) { testRenameTable(t, olap) })
t.Run("CreateTableAsSelect", func(t *testing.T) { testCreateTableAsSelect(t, olap) })
t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) })
t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) })
t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) })
}

Expand Down Expand Up @@ -119,6 +124,133 @@ func testCreateTableAsSelect(t *testing.T, olap drivers.OLAPStore) {
require.NoError(t, err)
}

func testInsertTableAsSelect_WithAppend(t *testing.T, olap drivers.OLAPStore) {
err := olap.CreateTableAsSelect(context.Background(), "append_tbl", false, "SELECT 1 AS id, 'Earth' AS planet", map[string]any{
"engine": "MergeTree",
"table": "tbl",
"distributed.sharding_key": "rand()",
"incremental_strategy": drivers.IncrementalStrategyAppend,
})
require.NoError(t, err)

err = olap.InsertTableAsSelect(context.Background(), "append_tbl", "SELECT 2 AS id, 'Mars' AS planet", false, true, drivers.IncrementalStrategyAppend, nil)
require.NoError(t, err)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT id, planet FROM append_tbl ORDER BY id"})
require.NoError(t, err)

var result []struct {
ID int
Planet string
}

for res.Next() {
var r struct {
ID int
Planet string
}
require.NoError(t, res.Scan(&r.ID, &r.Planet))
result = append(result, r)
}

expected := []struct {
ID int
Planet string
}{
{1, "Earth"},
{2, "Mars"},
}

// Convert the result set to a map to represent the set
resultSet := make(map[int]string)
for _, r := range result {
resultSet[r.ID] = r.Planet
}

// Check if the expected values are present in the result set
for _, e := range expected {
value, exists := resultSet[e.ID]
require.True(t, exists, "Expected ID %d to be present in the result set", e.ID)
require.Equal(t, e.Planet, value, "Expected planet for ID %d to be %s, but got %s", e.ID, e.Planet, value)
}
}

func testInsertTableAsSelect_WithMerge(t *testing.T, olap drivers.OLAPStore) {
err := olap.CreateTableAsSelect(context.Background(), "replace_tbl", false, "SELECT generate_series AS id, 'insert' AS value FROM generate_series(0, 4)", map[string]any{
"typs": "TABLE",
"engine": "MergeTree",
"table": "tbl",
"distributed.sharding_key": "rand()",
"incremental_strategy": drivers.IncrementalStrategyMerge,
"order_by": "value",
"primary_key": "value",
})
require.NoError(t, err)

err = olap.InsertTableAsSelect(context.Background(), "replace_tbl", "SELECT generate_series AS id, 'replace' AS value FROM generate_series(2, 5)", false, true, drivers.IncrementalStrategyMerge, []string{"id"})
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "not supported")
}
}

func testInsertTableAsSelect_WithPartitionOverwrite(t *testing.T, olap drivers.OLAPStore) {
err := olap.CreateTableAsSelect(context.Background(), "replace_tbl", false, "SELECT generate_series AS id, 'insert' AS value FROM generate_series(0, 4)", map[string]any{
"typs": "TABLE",
"engine": "MergeTree",
"table": "tbl",
"distributed.sharding_key": "rand()",
"incremental_strategy": drivers.IncrementalStrategyPartitionOverwrite,
"partition_by": "id",
"order_by": "value",
"primary_key": "value",
})
require.NoError(t, err)

err = olap.InsertTableAsSelect(context.Background(), "replace_tbl", "SELECT generate_series AS id, 'replace' AS value FROM generate_series(2, 5)", false, true, drivers.IncrementalStrategyMerge, nil)
require.NoError(t, err)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT id, value FROM replace_tbl ORDER BY id"})
require.NoError(t, err)

var result []struct {
ID int
Value string
}

for res.Next() {
var r struct {
ID int
Value string
}
require.NoError(t, res.Scan(&r.ID, &r.Value))
result = append(result, r)
}

expected := []struct {
ID int
Value string
}{
{0, "insert"},
{1, "insert"},
{2, "replace"},
{3, "replace"},
{4, "replace"},
}

// Convert the result set to a map to represent the set
resultSet := make(map[int]string)
for _, r := range result {
resultSet[r.ID] = r.Value
}

// Check if the expected values are present in the result set
for _, e := range expected {
value, exists := resultSet[e.ID]
require.True(t, exists, "Expected ID %d to be present in the result set", e.ID)
require.Equal(t, e.Value, value, "Expected value for ID %d to be %s, but got %s", e.ID, e.Value, value)
}
}

func testDictionary(t *testing.T, olap drivers.OLAPStore) {
err := olap.CreateTableAsSelect(context.Background(), "dict", false, "SELECT 1 AS id, 'Earth' AS planet", map[string]any{"table": "Dictionary", "primary_key": "id"})
require.NoError(t, err)
Expand Down
47 changes: 46 additions & 1 deletion runtime/drivers/duckdb/olap_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func Test_connection_DropTable(t *testing.T) {
require.NoError(t, res.Close())
}

func Test_connection_InsertTableAsSelect(t *testing.T) {
func Test_connection_InsertTableAsSelect_WithAppendStrategy(t *testing.T) {
temp := t.TempDir()

dbPath := filepath.Join(temp, "view.db")
Expand Down Expand Up @@ -199,6 +199,51 @@ func Test_connection_InsertTableAsSelect(t *testing.T) {
require.NoError(t, res.Close())
}

func Test_connection_InsertTableAsSelect_WithMergeStrategy(t *testing.T) {
temp := t.TempDir()

dbPath := filepath.Join(temp, "view.db")
handle, err := Driver{}.Open("default", map[string]any{"path": dbPath, "external_table_storage": true}, storage.MustNew(temp, nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
c := handle.(*connection)
require.NoError(t, c.Migrate(context.Background()))
c.AsOLAP("default")

err = c.CreateTableAsSelect(context.Background(), "test-merge", false, "SELECT range, 'insert' AS strategy FROM range(0, 4)", nil)
require.NoError(t, err)

err = c.InsertTableAsSelect(context.Background(), "test-merge", "SELECT range, 'merge' AS strategy FROM range(2, 4)", false, true, drivers.IncrementalStrategyMerge, []string{"range"})
require.NoError(t, err)

res, err := c.Execute(context.Background(), &drivers.Statement{Query: "SELECT range, strategy FROM 'test-merge' ORDER BY range"})
require.NoError(t, err)

var results []struct {
Range int
Strategy string
}
for res.Next() {
var r struct {
Range int
Strategy string
}
require.NoError(t, res.Scan(&r.Range, &r.Strategy))
results = append(results, r)
}
require.NoError(t, res.Close())

exptected := []struct {
Range int
Strategy string
}{
{0, "insert"},
{1, "insert"},
{2, "merge"},
{3, "merge"},
}
require.Equal(t, exptected, results)
}

func Test_connection_RenameTable(t *testing.T) {
temp := t.TempDir()
os.Mkdir(temp, fs.ModePerm)
Expand Down
7 changes: 4 additions & 3 deletions runtime/drivers/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ type IngestionSummary struct {
type IncrementalStrategy string

const (
IncrementalStrategyUnspecified IncrementalStrategy = ""
IncrementalStrategyAppend IncrementalStrategy = "append"
IncrementalStrategyMerge IncrementalStrategy = "merge"
IncrementalStrategyUnspecified IncrementalStrategy = ""
IncrementalStrategyAppend IncrementalStrategy = "append"
IncrementalStrategyMerge IncrementalStrategy = "merge"
IncrementalStrategyPartitionOverwrite IncrementalStrategy = "partition_overwrite"
)

// Dialect enumerates OLAP query languages.
Expand Down

0 comments on commit d3c4c5f

Please sign in to comment.