Skip to content

Commit

Permalink
add support for sharding in clickhouse (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkubiak authored Oct 27, 2023
1 parent 856ea12 commit 99f7588
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
4 changes: 3 additions & 1 deletion database/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
| `x-migrations-table`| Name of the migrations table |
| `x-migrations-table-engine`| Engine to use for the migrations table, defaults to TinyLog |
| `x-cluster-name` | Name of cluster for creating `schema_migrations` table cluster wide |
| `x-sharding` | Use distributed `schema_migrations` table in cluster with sharding enabled |
| `database` | The name of the database to connect to |
| `username` | The user to sign in as |
| `password` | The user's password |
| `host` | The host to connect to. |
| `port` | The port to bind to. |
| `x-multi-statement` | false | Enable multiple statements to be ran in a single migration (See note below) |
| `x-multi-statement` | Enable multiple statements to be ran in a single migration |

## Notes

Expand All @@ -23,3 +24,4 @@
* Clickhouse cluster mode is not officially supported, since it's not tested right now, but you can try enabling `schema_migrations` table replication by specifying a `x-cluster-name`:
* When `x-cluster-name` is specified, `x-migrations-table-engine` also should be specified. See the docs regarding [replicated table engines](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/#table_engines-replication).
* When `x-cluster-name` is specified, only the `schema_migrations` table is replicated across the cluster. You still need to write your migrations so that the application tables are replicated within the cluster.
* When `x-cluster-name` is specified, you can use `x-sharding` if your cluster supports multiple shards. The `schema_migrations` table will be created as distributed table so that is is accessible on any shard.
69 changes: 56 additions & 13 deletions database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
MigrationsTableEngine string
MultiStatementEnabled bool
MultiStatementMaxSize int
Sharding bool
}

func init() {
Expand Down Expand Up @@ -101,6 +102,7 @@ func (ch *ClickHouse) Open(dsn string) (database.Driver, error) {
ClusterName: purl.Query().Get("x-cluster-name"),
MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true",
MultiStatementMaxSize: multiStatementMaxSize,
Sharding: purl.Query().Get("x-sharding") == "true",
},
}

Expand Down Expand Up @@ -232,21 +234,32 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
}

// if not, create the empty migration table
if len(ch.config.ClusterName) > 0 {
query = fmt.Sprintf(`
CREATE TABLE %s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine)
return ch.createVersionTable()
}

func (ch *ClickHouse) createVersionTable() (err error) {
if len(ch.config.ClusterName) == 0 {
return ch.createBasicVersionTable(ch.config.MigrationsTable)
}

if ch.config.Sharding {
localTableName := ch.config.MigrationsTable + "_local"
if err := ch.createClusterWideVersionTable(localTableName); err != nil {
return err
}
return ch.createDistributedVersionTable(ch.config.MigrationsTable, localTableName)
} else {
query = fmt.Sprintf(`
CREATE TABLE %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.MigrationsTableEngine)
return ch.createClusterWideVersionTable(ch.config.MigrationsTable)
}
}

func (ch *ClickHouse) createBasicVersionTable(tableName string) (err error) {
query := fmt.Sprintf(`
CREATE TABLE %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, tableName, ch.config.MigrationsTableEngine)

if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") {
query = fmt.Sprintf(`%s ORDER BY sequence`, query)
Expand All @@ -258,6 +271,36 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
return nil
}

func (ch *ClickHouse) createClusterWideVersionTable(tableName string) (err error) {
query := fmt.Sprintf(`
CREATE TABLE %s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, tableName, ch.config.ClusterName, ch.config.MigrationsTableEngine)
if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") {
query = fmt.Sprintf(`%s ORDER BY sequence`, query)
}

if _, err := ch.conn.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return nil
}

func (ch *ClickHouse) createDistributedVersionTable(distTableName, localTableName string) (err error) {
query := fmt.Sprintf(`
CREATE TABLE %s ON CLUSTER %s
AS %s
ENGINE = Distributed(%s, %s, %s, cityHash64(sequence))`,
distTableName, ch.config.ClusterName, localTableName, ch.config.ClusterName, ch.config.DatabaseName, localTableName)

if _, err := ch.conn.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return nil
}

func (ch *ClickHouse) Drop() (err error) {
query := "SHOW TABLES FROM " + ch.config.DatabaseName
tables, err := ch.conn.Query(query)
Expand Down

0 comments on commit 99f7588

Please sign in to comment.