From 99f75880053d51cc83e71c05d2fc499d4a11f3b2 Mon Sep 17 00:00:00 2001 From: Bartek Kubiak Date: Fri, 27 Oct 2023 13:23:04 +0200 Subject: [PATCH] add support for sharding in clickhouse (#1) --- database/clickhouse/README.md | 4 +- database/clickhouse/clickhouse.go | 69 +++++++++++++++++++++++++------ 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/database/clickhouse/README.md b/database/clickhouse/README.md index 96ad79f1c..5ca719ef9 100644 --- a/database/clickhouse/README.md +++ b/database/clickhouse/README.md @@ -7,12 +7,13 @@ | `x-migrations-table`| Name of the migrations table | | `x-migrations-table-engine`| Engine to use for the migrations table, defaults to TinyLog | | `x-cluster-name` | Name of cluster for creating `schema_migrations` table cluster wide | +| `x-sharding` | Use distributed `schema_migrations` table in cluster with sharding enabled | | `database` | The name of the database to connect to | | `username` | The user to sign in as | | `password` | The user's password | | `host` | The host to connect to. | | `port` | The port to bind to. | -| `x-multi-statement` | false | Enable multiple statements to be ran in a single migration (See note below) | +| `x-multi-statement` | Enable multiple statements to be ran in a single migration | ## Notes @@ -23,3 +24,4 @@ * Clickhouse cluster mode is not officially supported, since it's not tested right now, but you can try enabling `schema_migrations` table replication by specifying a `x-cluster-name`: * When `x-cluster-name` is specified, `x-migrations-table-engine` also should be specified. See the docs regarding [replicated table engines](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/#table_engines-replication). * When `x-cluster-name` is specified, only the `schema_migrations` table is replicated across the cluster. You still need to write your migrations so that the application tables are replicated within the cluster. + * When `x-cluster-name` is specified, you can use `x-sharding` if your cluster supports multiple shards. The `schema_migrations` table will be created as distributed table so that is is accessible on any shard. diff --git a/database/clickhouse/clickhouse.go b/database/clickhouse/clickhouse.go index ee1ac209f..b98cc1693 100644 --- a/database/clickhouse/clickhouse.go +++ b/database/clickhouse/clickhouse.go @@ -34,6 +34,7 @@ type Config struct { MigrationsTableEngine string MultiStatementEnabled bool MultiStatementMaxSize int + Sharding bool } func init() { @@ -101,6 +102,7 @@ func (ch *ClickHouse) Open(dsn string) (database.Driver, error) { ClusterName: purl.Query().Get("x-cluster-name"), MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true", MultiStatementMaxSize: multiStatementMaxSize, + Sharding: purl.Query().Get("x-sharding") == "true", }, } @@ -232,21 +234,32 @@ func (ch *ClickHouse) ensureVersionTable() (err error) { } // if not, create the empty migration table - if len(ch.config.ClusterName) > 0 { - query = fmt.Sprintf(` - CREATE TABLE %s ON CLUSTER %s ( - version Int64, - dirty UInt8, - sequence UInt64 - ) Engine=%s`, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine) + return ch.createVersionTable() +} + +func (ch *ClickHouse) createVersionTable() (err error) { + if len(ch.config.ClusterName) == 0 { + return ch.createBasicVersionTable(ch.config.MigrationsTable) + } + + if ch.config.Sharding { + localTableName := ch.config.MigrationsTable + "_local" + if err := ch.createClusterWideVersionTable(localTableName); err != nil { + return err + } + return ch.createDistributedVersionTable(ch.config.MigrationsTable, localTableName) } else { - query = fmt.Sprintf(` - CREATE TABLE %s ( - version Int64, - dirty UInt8, - sequence UInt64 - ) Engine=%s`, ch.config.MigrationsTable, ch.config.MigrationsTableEngine) + return ch.createClusterWideVersionTable(ch.config.MigrationsTable) } +} + +func (ch *ClickHouse) createBasicVersionTable(tableName string) (err error) { + query := fmt.Sprintf(` + CREATE TABLE %s ( + version Int64, + dirty UInt8, + sequence UInt64 + ) Engine=%s`, tableName, ch.config.MigrationsTableEngine) if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") { query = fmt.Sprintf(`%s ORDER BY sequence`, query) @@ -258,6 +271,36 @@ func (ch *ClickHouse) ensureVersionTable() (err error) { return nil } +func (ch *ClickHouse) createClusterWideVersionTable(tableName string) (err error) { + query := fmt.Sprintf(` + CREATE TABLE %s ON CLUSTER %s ( + version Int64, + dirty UInt8, + sequence UInt64 + ) Engine=%s`, tableName, ch.config.ClusterName, ch.config.MigrationsTableEngine) + if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") { + query = fmt.Sprintf(`%s ORDER BY sequence`, query) + } + + if _, err := ch.conn.Exec(query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil +} + +func (ch *ClickHouse) createDistributedVersionTable(distTableName, localTableName string) (err error) { + query := fmt.Sprintf(` + CREATE TABLE %s ON CLUSTER %s + AS %s + ENGINE = Distributed(%s, %s, %s, cityHash64(sequence))`, + distTableName, ch.config.ClusterName, localTableName, ch.config.ClusterName, ch.config.DatabaseName, localTableName) + + if _, err := ch.conn.Exec(query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil +} + func (ch *ClickHouse) Drop() (err error) { query := "SHOW TABLES FROM " + ch.config.DatabaseName tables, err := ch.conn.Query(query)