From d3c4c5f41138b679227d56d580e65a4acf054e10 Mon Sep 17 00:00:00 2001 From: Rohith Reddy Kota Date: Thu, 5 Dec 2024 14:08:53 -0500 Subject: [PATCH] Implement Incremental Merge Strategy for Clickhouse (#6069) * implement IncrementalReplaceStrategy * apply replace incremental strategy for clickhouse * cosmetic change * cosmetic change * fix fmt issues * defer drop table * Simplified and added support of distributed table * Simplified and added support of distributed table * Refactor clickhouse driver partition handling * Validate incremental strategy for partition key presence * Refactor table operations into helper methods * Improve partition handling and table creation checks * Fix case sensitivity in partition key validation. * Fix SQL query string formatting in ClickHouse driver * Use deterministic names for temporary tables * Update DuckDB driver test to use new storage initialization * Add partition overwrite incremental strategy --------- Co-authored-by: e.sevastyanov --- runtime/drivers/clickhouse/model_manager.go | 2 +- runtime/drivers/clickhouse/olap.go | 127 ++++++++++++++++++- runtime/drivers/clickhouse/olap_test.go | 132 ++++++++++++++++++++ runtime/drivers/duckdb/olap_crud_test.go | 47 ++++++- runtime/drivers/olap.go | 7 +- 5 files changed, 308 insertions(+), 7 deletions(-) diff --git a/runtime/drivers/clickhouse/model_manager.go b/runtime/drivers/clickhouse/model_manager.go index dcb3e5f7d16..82dac5954c9 100644 --- a/runtime/drivers/clickhouse/model_manager.go +++ b/runtime/drivers/clickhouse/model_manager.go @@ -88,7 +88,7 @@ func (p *ModelOutputProperties) Validate(opts *drivers.ModelExecuteOptions) erro } switch p.IncrementalStrategy { - case drivers.IncrementalStrategyUnspecified, drivers.IncrementalStrategyAppend: + case drivers.IncrementalStrategyUnspecified, drivers.IncrementalStrategyAppend, drivers.IncrementalStrategyPartitionOverwrite: default: return fmt.Errorf("invalid incremental strategy %q", p.IncrementalStrategy) } diff --git a/runtime/drivers/clickhouse/olap.go b/runtime/drivers/clickhouse/olap.go index 55fb2fbac66..68406e7162a 100644 --- a/runtime/drivers/clickhouse/olap.go +++ b/runtime/drivers/clickhouse/olap.go @@ -2,6 +2,7 @@ package clickhouse import ( "context" + "crypto/md5" "errors" "fmt" "strings" @@ -251,7 +252,80 @@ func (c *connection) InsertTableAsSelect(ctx context.Context, name, sql string, }) } - // merge strategy is also not supported for clickhouse + if strategy == drivers.IncrementalStrategyPartitionOverwrite { + _, onCluster, err := informationSchema{c: c}.entityType(ctx, "", name) + if err != nil { + return err + } + onClusterClause := "" + if onCluster { + onClusterClause = "ON CLUSTER " + safeSQLName(c.config.Cluster) + } + // Get the engine info of the given table + engine, err := c.getTableEngine(ctx, name) + if err != nil { + return err + } + // Distributed table cannot be altered directly, so we need to alter the local table + if engine == "Distributed" { + name = localTableName(name) + } + // create temp table with the same schema using a deterministic name + tempName := fmt.Sprintf("__rill_temp_%s_%x", name, md5.Sum([]byte(sql))) + err = c.Exec(ctx, &drivers.Statement{ + Query: fmt.Sprintf("CREATE OR REPLACE TABLE %s %s AS %s", safeSQLName(tempName), onClusterClause, name), + Priority: 1, + }) + if err != nil { + return err + } + // clean up the temp table + defer func() { + var cancel context.CancelFunc + + // If the original context is cancelled, create a new context for cleanup + if ctx.Err() != nil { + ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second) + } else { + cancel = func() {} + } + defer cancel() + + err = c.Exec(ctx, &drivers.Statement{ + Query: fmt.Sprintf("DROP TABLE %s %s", safeSQLName(tempName), onClusterClause), + Priority: 1, + }) + if err != nil { + c.logger.Warn("clickhouse: failed to drop temp table", zap.String("name", tempName), zap.Error(err)) + } + }() + // insert into temp table + err = c.Exec(ctx, &drivers.Statement{ + Query: fmt.Sprintf("INSERT INTO %s %s", safeSQLName(tempName), sql), + Priority: 1, + }) + if err != nil { + return err + } + // list partitions from the temp table + partitions, err := c.getTablePartitions(ctx, tempName) + if err != nil { + return err + } + // iterate over partitions and replace them in the main table + for _, part := range partitions { + // alter the main table to replace the partition + err = c.Exec(ctx, &drivers.Statement{ + Query: fmt.Sprintf("ALTER TABLE %s %s REPLACE PARTITION %s FROM %s", safeSQLName(name), onClusterClause, part, safeSQLName(tempName)), + Priority: 1, + }) + if err != nil { + return err + } + } + return nil + } + return fmt.Errorf("incremental insert strategy %q not supported", strategy) } @@ -477,7 +551,15 @@ func (c *connection) createTable(ctx context.Context, name, sql string, outputPr } else { fmt.Fprintf(&create, " %s ", outputProps.Columns) } - create.WriteString(outputProps.tblConfig()) + + tableConfig := outputProps.tblConfig() + create.WriteString(tableConfig) + + // validate incremental strategy + if outputProps.IncrementalStrategy == drivers.IncrementalStrategyPartitionOverwrite && + !strings.Contains(strings.ToUpper(tableConfig), "PARTITION BY") { + return fmt.Errorf("clickhouse: incremental strategy partition_overwrite requires a partition key") + } // create table err := c.Exec(ctx, &drivers.Statement{Query: create.String(), Priority: 100}) @@ -659,6 +741,47 @@ func (c *connection) acquireConn(ctx context.Context) (*sqlx.Conn, func() error, return conn, release, nil } +func (c *connection) getTableEngine(ctx context.Context, name string) (string, error) { + var engine string + res, err := c.Execute(ctx, &drivers.Statement{ + Query: "SELECT engine FROM system.tables WHERE database = currentDatabase() AND name = ?", + Args: []any{name}, + Priority: 1, + }) + if err != nil { + return "", err + } + defer res.Close() + if res.Next() { + if err := res.Scan(&engine); err != nil { + return "", err + } + } + return engine, nil +} + +func (c *connection) getTablePartitions(ctx context.Context, name string) ([]string, error) { + res, err := c.Execute(ctx, &drivers.Statement{ + Query: "SELECT DISTINCT partition FROM system.parts WHERE table = ?", + Args: []any{name}, + Priority: 1, + }) + if err != nil { + return nil, err + } + defer res.Close() + // collect partitions + var partitions []string + for res.Next() { + var part string + if err := res.Scan(&part); err != nil { + return nil, err + } + partitions = append(partitions, part) + } + return partitions, nil +} + func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) { if r == nil { return nil, nil diff --git a/runtime/drivers/clickhouse/olap_test.go b/runtime/drivers/clickhouse/olap_test.go index d05e2767431..da37a738226 100644 --- a/runtime/drivers/clickhouse/olap_test.go +++ b/runtime/drivers/clickhouse/olap_test.go @@ -3,6 +3,7 @@ package clickhouse_test import ( "context" "fmt" + "github.com/stretchr/testify/assert" "testing" "github.com/rilldata/rill/runtime/drivers" @@ -37,6 +38,8 @@ func testClickhouseSingleHost(t *testing.T, dsn string) { }) t.Run("RenameTable", func(t *testing.T) { testRenameTable(t, olap) }) t.Run("CreateTableAsSelect", func(t *testing.T) { testCreateTableAsSelect(t, olap) }) + t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) }) + t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) }) t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) }) } @@ -56,6 +59,8 @@ func testClickhouseCluster(t *testing.T, dsn, cluster string) { }) t.Run("RenameTable", func(t *testing.T) { testRenameTable(t, olap) }) t.Run("CreateTableAsSelect", func(t *testing.T) { testCreateTableAsSelect(t, olap) }) + t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) }) + t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) }) t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) }) } @@ -119,6 +124,133 @@ func testCreateTableAsSelect(t *testing.T, olap drivers.OLAPStore) { require.NoError(t, err) } +func testInsertTableAsSelect_WithAppend(t *testing.T, olap drivers.OLAPStore) { + err := olap.CreateTableAsSelect(context.Background(), "append_tbl", false, "SELECT 1 AS id, 'Earth' AS planet", map[string]any{ + "engine": "MergeTree", + "table": "tbl", + "distributed.sharding_key": "rand()", + "incremental_strategy": drivers.IncrementalStrategyAppend, + }) + require.NoError(t, err) + + err = olap.InsertTableAsSelect(context.Background(), "append_tbl", "SELECT 2 AS id, 'Mars' AS planet", false, true, drivers.IncrementalStrategyAppend, nil) + require.NoError(t, err) + + res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT id, planet FROM append_tbl ORDER BY id"}) + require.NoError(t, err) + + var result []struct { + ID int + Planet string + } + + for res.Next() { + var r struct { + ID int + Planet string + } + require.NoError(t, res.Scan(&r.ID, &r.Planet)) + result = append(result, r) + } + + expected := []struct { + ID int + Planet string + }{ + {1, "Earth"}, + {2, "Mars"}, + } + + // Convert the result set to a map to represent the set + resultSet := make(map[int]string) + for _, r := range result { + resultSet[r.ID] = r.Planet + } + + // Check if the expected values are present in the result set + for _, e := range expected { + value, exists := resultSet[e.ID] + require.True(t, exists, "Expected ID %d to be present in the result set", e.ID) + require.Equal(t, e.Planet, value, "Expected planet for ID %d to be %s, but got %s", e.ID, e.Planet, value) + } +} + +func testInsertTableAsSelect_WithMerge(t *testing.T, olap drivers.OLAPStore) { + err := olap.CreateTableAsSelect(context.Background(), "replace_tbl", false, "SELECT generate_series AS id, 'insert' AS value FROM generate_series(0, 4)", map[string]any{ + "typs": "TABLE", + "engine": "MergeTree", + "table": "tbl", + "distributed.sharding_key": "rand()", + "incremental_strategy": drivers.IncrementalStrategyMerge, + "order_by": "value", + "primary_key": "value", + }) + require.NoError(t, err) + + err = olap.InsertTableAsSelect(context.Background(), "replace_tbl", "SELECT generate_series AS id, 'replace' AS value FROM generate_series(2, 5)", false, true, drivers.IncrementalStrategyMerge, []string{"id"}) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "not supported") + } +} + +func testInsertTableAsSelect_WithPartitionOverwrite(t *testing.T, olap drivers.OLAPStore) { + err := olap.CreateTableAsSelect(context.Background(), "replace_tbl", false, "SELECT generate_series AS id, 'insert' AS value FROM generate_series(0, 4)", map[string]any{ + "typs": "TABLE", + "engine": "MergeTree", + "table": "tbl", + "distributed.sharding_key": "rand()", + "incremental_strategy": drivers.IncrementalStrategyPartitionOverwrite, + "partition_by": "id", + "order_by": "value", + "primary_key": "value", + }) + require.NoError(t, err) + + err = olap.InsertTableAsSelect(context.Background(), "replace_tbl", "SELECT generate_series AS id, 'replace' AS value FROM generate_series(2, 5)", false, true, drivers.IncrementalStrategyMerge, nil) + require.NoError(t, err) + + res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT id, value FROM replace_tbl ORDER BY id"}) + require.NoError(t, err) + + var result []struct { + ID int + Value string + } + + for res.Next() { + var r struct { + ID int + Value string + } + require.NoError(t, res.Scan(&r.ID, &r.Value)) + result = append(result, r) + } + + expected := []struct { + ID int + Value string + }{ + {0, "insert"}, + {1, "insert"}, + {2, "replace"}, + {3, "replace"}, + {4, "replace"}, + } + + // Convert the result set to a map to represent the set + resultSet := make(map[int]string) + for _, r := range result { + resultSet[r.ID] = r.Value + } + + // Check if the expected values are present in the result set + for _, e := range expected { + value, exists := resultSet[e.ID] + require.True(t, exists, "Expected ID %d to be present in the result set", e.ID) + require.Equal(t, e.Value, value, "Expected value for ID %d to be %s, but got %s", e.ID, e.Value, value) + } +} + func testDictionary(t *testing.T, olap drivers.OLAPStore) { err := olap.CreateTableAsSelect(context.Background(), "dict", false, "SELECT 1 AS id, 'Earth' AS planet", map[string]any{"table": "Dictionary", "primary_key": "id"}) require.NoError(t, err) diff --git a/runtime/drivers/duckdb/olap_crud_test.go b/runtime/drivers/duckdb/olap_crud_test.go index 7ebf25827f0..93dac42d44b 100644 --- a/runtime/drivers/duckdb/olap_crud_test.go +++ b/runtime/drivers/duckdb/olap_crud_test.go @@ -171,7 +171,7 @@ func Test_connection_DropTable(t *testing.T) { require.NoError(t, res.Close()) } -func Test_connection_InsertTableAsSelect(t *testing.T) { +func Test_connection_InsertTableAsSelect_WithAppendStrategy(t *testing.T) { temp := t.TempDir() dbPath := filepath.Join(temp, "view.db") @@ -199,6 +199,51 @@ func Test_connection_InsertTableAsSelect(t *testing.T) { require.NoError(t, res.Close()) } +func Test_connection_InsertTableAsSelect_WithMergeStrategy(t *testing.T) { + temp := t.TempDir() + + dbPath := filepath.Join(temp, "view.db") + handle, err := Driver{}.Open("default", map[string]any{"path": dbPath, "external_table_storage": true}, storage.MustNew(temp, nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + c := handle.(*connection) + require.NoError(t, c.Migrate(context.Background())) + c.AsOLAP("default") + + err = c.CreateTableAsSelect(context.Background(), "test-merge", false, "SELECT range, 'insert' AS strategy FROM range(0, 4)", nil) + require.NoError(t, err) + + err = c.InsertTableAsSelect(context.Background(), "test-merge", "SELECT range, 'merge' AS strategy FROM range(2, 4)", false, true, drivers.IncrementalStrategyMerge, []string{"range"}) + require.NoError(t, err) + + res, err := c.Execute(context.Background(), &drivers.Statement{Query: "SELECT range, strategy FROM 'test-merge' ORDER BY range"}) + require.NoError(t, err) + + var results []struct { + Range int + Strategy string + } + for res.Next() { + var r struct { + Range int + Strategy string + } + require.NoError(t, res.Scan(&r.Range, &r.Strategy)) + results = append(results, r) + } + require.NoError(t, res.Close()) + + exptected := []struct { + Range int + Strategy string + }{ + {0, "insert"}, + {1, "insert"}, + {2, "merge"}, + {3, "merge"}, + } + require.Equal(t, exptected, results) +} + func Test_connection_RenameTable(t *testing.T) { temp := t.TempDir() os.Mkdir(temp, fs.ModePerm) diff --git a/runtime/drivers/olap.go b/runtime/drivers/olap.go index 0bfa53ded69..f8fc7634958 100644 --- a/runtime/drivers/olap.go +++ b/runtime/drivers/olap.go @@ -160,9 +160,10 @@ type IngestionSummary struct { type IncrementalStrategy string const ( - IncrementalStrategyUnspecified IncrementalStrategy = "" - IncrementalStrategyAppend IncrementalStrategy = "append" - IncrementalStrategyMerge IncrementalStrategy = "merge" + IncrementalStrategyUnspecified IncrementalStrategy = "" + IncrementalStrategyAppend IncrementalStrategy = "append" + IncrementalStrategyMerge IncrementalStrategy = "merge" + IncrementalStrategyPartitionOverwrite IncrementalStrategy = "partition_overwrite" ) // Dialect enumerates OLAP query languages.