diff --git a/admin/admin.go b/admin/admin.go index 83625e93507..f85e25122ef 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -2,6 +2,7 @@ package admin import ( "context" + "errors" "fmt" "cloud.google.com/go/storage" @@ -131,6 +132,20 @@ func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Is } func (s *Service) Close() error { + var allErrs error + for _, p := range s.ProvisionerSet { + err := p.Close() + if err != nil { + allErrs = errors.Join(allErrs, err) + } + } + s.Used.Close() - return s.DB.Close() + + err := s.DB.Close() + if err != nil { + allErrs = errors.Join(allErrs, err) + } + + return allErrs } diff --git a/admin/provisioner/clickhousestatic/provisioner.go b/admin/provisioner/clickhousestatic/provisioner.go new file mode 100644 index 00000000000..1432e3c8649 --- /dev/null +++ b/admin/provisioner/clickhousestatic/provisioner.go @@ -0,0 +1,241 @@ +package clickhousestatic + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/json" + "fmt" + "io" + "net/url" + "strings" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/rilldata/rill/admin/database" + "github.com/rilldata/rill/admin/provisioner" + "go.uber.org/zap" +) + +func init() { + provisioner.Register("clickhouse-static", New) +} + +type Spec struct { + // DSN with admin permissions for a Clickhouse service. + // This will be used to create a new (virtual) database and access-restricted user for each provisioned resource. + DSN string `json:"dsn"` +} + +// Provisioner provisions Clickhouse resources using a static, multi-tenant Clickhouse service. +// It creates a new (virtual) database and user with access restricted to that database for each resource. +type Provisioner struct { + spec *Spec + logger *zap.Logger + ch *sql.DB +} + +var _ provisioner.Provisioner = (*Provisioner)(nil) + +func New(specJSON []byte, _ database.DB, logger *zap.Logger) (provisioner.Provisioner, error) { + spec := &Spec{} + err := json.Unmarshal(specJSON, spec) + if err != nil { + return nil, fmt.Errorf("failed to parse provisioner spec: %w", err) + } + + opts, err := clickhouse.ParseDSN(spec.DSN) + if err != nil { + return nil, fmt.Errorf("failed to parse DSN: %w", err) + } + ch := clickhouse.OpenDB(opts) + + return &Provisioner{ + spec: spec, + logger: logger, + ch: ch, + }, nil +} + +func (p *Provisioner) Type() string { + return "clickhouse-static" +} + +func (p *Provisioner) Close() error { + return p.ch.Close() +} + +func (p *Provisioner) Provision(ctx context.Context, r *provisioner.Resource, opts *provisioner.ResourceOptions) (*provisioner.Resource, error) { + // Can only provision clickhouse resources + if r.Type != provisioner.ResourceTypeClickHouse { + return nil, provisioner.ErrResourceTypeNotSupported + } + + // Parse the resource's config (in case it's an update/check) + cfg, err := provisioner.NewClickhouseConfig(r.Config) + if err != nil { + return nil, err + } + + // If the config has already been populated, do a health check and exit early (currently there's nothing to update). + if cfg.DSN != "" { + err := p.pingWithResourceDSN(ctx, cfg.DSN) + if err != nil { + return nil, fmt.Errorf("failed to ping clickhouse resource: %w", err) + } + + return r, nil + } + + // Prepare for creating the schema and user. + id := strings.ReplaceAll(r.ID, "-", "") + dbName := fmt.Sprintf("rill_%s", id) + user := fmt.Sprintf("rill_%s", id) + password := newPassword() + annotationsJSON, err := json.Marshal(opts.Annotations) + if err != nil { + return nil, fmt.Errorf("failed to marshal annotations: %w", err) + } + + // Idempotently create the schema + _, err = p.ch.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s COMMENT ?", dbName), string(annotationsJSON)) + if err != nil { + return nil, fmt.Errorf("failed to create clickhouse database: %w", err) + } + + // Idempotently create the user + _, err = p.ch.ExecContext(ctx, fmt.Sprintf("CREATE USER IF NOT EXISTS %s IDENTIFIED WITH sha256_password BY ? DEFAULT DATABASE %s GRANTEES NONE", user, dbName), password) + if err != nil { + return nil, fmt.Errorf("failed to create clickhouse user: %w", err) + } + + // Grant privileges on the database to the user + _, err = p.ch.ExecContext(ctx, fmt.Sprintf(` + GRANT + SELECT, + INSERT, + ALTER, + CREATE TABLE, + CREATE DICTIONARY, + CREATE VIEW, + DROP TABLE, + DROP DICTIONARY, + DROP VIEW, + TRUNCATE, + OPTIMIZE, + SHOW DICTIONARIES, + dictGet + ON %s.* TO %s + `, dbName, user)) + if err != nil { + return nil, fmt.Errorf("failed to grant privileges to clickhouse user: %w", err) + } + + // Grant some additional global privileges to the user + _, err = p.ch.ExecContext(ctx, fmt.Sprintf(` + GRANT + URL, + REMOTE, + MONGO, + MYSQL, + POSTGRES, + S3, + AZURE + ON *.* TO %s + `, user)) + if err != nil { + return nil, fmt.Errorf("failed to grant global privileges to clickhouse user: %w", err) + } + + // Build DSN for the resource and return it + dsn, err := url.Parse(p.spec.DSN) + if err != nil { + return nil, fmt.Errorf("failed to parse base DSN: %w", err) + } + dsn.User = url.UserPassword(user, password) + dsn.Path = dbName + cfg = &provisioner.ClickhouseConfig{ + DSN: dsn.String(), + } + return &provisioner.Resource{ + ID: r.ID, + Type: r.Type, + State: nil, + Config: cfg.AsMap(), + }, nil +} + +func (p *Provisioner) Deprovision(ctx context.Context, r *provisioner.Resource) error { + // Check it's a clickhouse resource + if r.Type != provisioner.ResourceTypeClickHouse { + return fmt.Errorf("unexpected resource type %q", r.Type) + } + + // Parse the resource's config + cfg, err := provisioner.NewClickhouseConfig(r.Config) + if err != nil { + return err + } + + // Exit early if the config is empty (nothing to deprovision) + if cfg.DSN == "" { + return nil + } + + // Parse the DSN + opts, err := clickhouse.ParseDSN(cfg.DSN) + if err != nil { + return fmt.Errorf("failed to parse DSN during deprovisioning: %w", err) + } + + // Drop the database + _, err = p.ch.ExecContext(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS %s", opts.Auth.Database)) + if err != nil { + return fmt.Errorf("failed to drop clickhouse database: %w", err) + } + + // Drop the user + _, err = p.ch.ExecContext(ctx, fmt.Sprintf("DROP USER IF EXISTS %s", opts.Auth.Username)) + if err != nil { + return fmt.Errorf("failed to drop clickhouse user: %w", err) + } + + return nil +} + +func (p *Provisioner) AwaitReady(ctx context.Context, r *provisioner.Resource) error { + return nil +} + +func (p *Provisioner) Check(ctx context.Context) error { + return nil +} + +func (p *Provisioner) CheckResource(ctx context.Context, r *provisioner.Resource, opts *provisioner.ResourceOptions) (*provisioner.Resource, error) { + // Provision is idempotent and will do nothing if the resource is already provisioned. + return p.Provision(ctx, r, opts) +} + +func (p *Provisioner) pingWithResourceDSN(ctx context.Context, dsn string) error { + db, err := sql.Open("clickhouse", dsn) + if err != nil { + return fmt.Errorf("failed to open tenant connection: %w", err) + } + defer db.Close() + + _, err = db.ExecContext(ctx, "SELECT 1") + if err != nil { + return fmt.Errorf("failed to execute query on tenant: %w", err) + } + + return nil +} + +func newPassword() string { + var b [16]byte + _, err := io.ReadFull(rand.Reader, b[:]) + if err != nil { + panic(err) + } + // Ensure all of digits/letters/uppercase/lowercase/special characters + return fmt.Sprintf("1Rr!%x", b[:]) +} diff --git a/admin/provisioner/clickhousestatic/provisioner_test.go b/admin/provisioner/clickhousestatic/provisioner_test.go new file mode 100644 index 00000000000..da86898a47d --- /dev/null +++ b/admin/provisioner/clickhousestatic/provisioner_test.go @@ -0,0 +1,146 @@ +package clickhousestatic + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/google/uuid" + "github.com/rilldata/rill/admin/provisioner" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + testcontainersclickhouse "github.com/testcontainers/testcontainers-go/modules/clickhouse" + "go.uber.org/zap" +) + +func Test(t *testing.T) { + // Create a test ClickHouse cluster + container, err := testcontainersclickhouse.Run( + context.Background(), + "clickhouse/clickhouse-server:24.6.2.17", + // Add a user config file that enables access management for the "default" user + testcontainers.CustomizeRequestOption(func(req *testcontainers.GenericContainerRequest) error { + req.Files = append(req.Files, testcontainers.ContainerFile{ + Reader: strings.NewReader(`1`), + ContainerFilePath: "/etc/clickhouse-server/users.d/default.xml", + FileMode: 0o755, + }) + return nil + }), + ) + require.NoError(t, err) + t.Cleanup(func() { + err := container.Terminate(context.Background()) + require.NoError(t, err) + }) + host, err := container.Host(context.Background()) + require.NoError(t, err) + port, err := container.MappedPort(context.Background(), "9000/tcp") + require.NoError(t, err) + dsn := fmt.Sprintf("clickhouse://default:default@%v:%v", host, port.Port()) + + // Create the provisioner + specJSON, err := json.Marshal(&Spec{ + DSN: dsn, + }) + require.NoError(t, err) + p, err := New(specJSON, nil, zap.NewNop()) + require.NoError(t, err) + + // Provision two resources + r1, db1 := provisionClickHouse(t, p) + defer db1.Close() + r2, db2 := provisionClickHouse(t, p) + defer db2.Close() + + // Check the resources are different + require.NotEqual(t, r1.ID, r2.ID) + require.NotEqual(t, r1.Config["dsn"], r2.Config["dsn"]) + + // Create a table with the first connection + _, err = db1.Exec("CREATE TABLE test (id UInt64) ENGINE = Memory") + require.NoError(t, err) + _, err = db1.Exec("INSERT INTO test VALUES (1)") + require.NoError(t, err) + rows, err := db1.Query("SELECT COUNT(*) FROM system.tables") + require.NoError(t, err) + for rows.Next() { + var count int + err = rows.Scan(&count) + require.NoError(t, err) + require.Equal(t, count, 1) + } + rows.Close() + + // Get the name of the first connection's database + dsn1, err := clickhouse.ParseDSN(r1.Config["dsn"].(string)) + require.NoError(t, err) + db1Name := dsn1.Auth.Database + + // Check the second connection doesn't have access to the table in the first connection + _, err = db2.Exec(fmt.Sprintf("SELECT * FROM %s.test", db1Name)) + require.Error(t, err) + _, err = db2.Exec("SELECT * FROM test") + require.Error(t, err) + + // Check the second connection can't see the other connection's tables in the information schema + rows, err = db2.Query("SELECT name FROM system.tables") + require.NoError(t, err) + for rows.Next() { + require.Fail(t, "unexpected visible table in information schema") + } + rows.Close() + + // Deprovision the resources + err = p.Deprovision(context.Background(), r1) + require.NoError(t, err) + err = p.Deprovision(context.Background(), r2) + require.NoError(t, err) + + // Check the connections are deficient + _, err = db1.Exec("SELECT 1") + require.Error(t, err) + _, err = db2.Exec("SELECT 1") + require.Error(t, err) +} + +func provisionClickHouse(t *testing.T, p provisioner.Provisioner) (*provisioner.Resource, *sql.DB) { + // Provision a new resource + in := &provisioner.Resource{ + ID: uuid.New().String(), + Type: provisioner.ResourceTypeClickHouse, + State: nil, + Config: nil, + } + opts := &provisioner.ResourceOptions{ + Args: nil, + Annotations: map[string]string{"organization": "test", "project": "test"}, + RillVersion: "dev", + } + out, err := p.Provision(context.Background(), in, opts) + require.NoError(t, err) + + // Check the resource + require.Equal(t, in.ID, out.ID) + require.Equal(t, in.Type, out.Type) + require.Empty(t, out.State) + require.NotEmpty(t, out.Config) + + // Check the resource + _, err = p.CheckResource(context.Background(), out, opts) + require.NoError(t, err) + + // Open a connection to the database + db, err := sql.Open("clickhouse", out.Config["dsn"].(string)) + require.NoError(t, err) + + // Ping + err = db.Ping() + require.NoError(t, err) + + return out, db +} diff --git a/admin/provisioner/kubernetes/kubernetes.go b/admin/provisioner/kubernetes/kubernetes.go index ad7620d65a9..f6c350c3f4e 100644 --- a/admin/provisioner/kubernetes/kubernetes.go +++ b/admin/provisioner/kubernetes/kubernetes.go @@ -144,6 +144,10 @@ func (p *KubernetesProvisioner) Type() string { return "kubernetes" } +func (p *KubernetesProvisioner) Close() error { + return nil +} + func (p *KubernetesProvisioner) Provision(ctx context.Context, r *provisioner.Resource, opts *provisioner.ResourceOptions) (*provisioner.Resource, error) { // Can only provision runtime resources if r.Type != provisioner.ResourceTypeRuntime { diff --git a/admin/provisioner/provisioner.go b/admin/provisioner/provisioner.go index 8b69cfb6099..80a68089e76 100644 --- a/admin/provisioner/provisioner.go +++ b/admin/provisioner/provisioner.go @@ -33,6 +33,8 @@ func Register(typ string, fn ProvisionerInitializer) { type Provisioner interface { // Type returns the type of the provisioner. Type() string + // Close is called when the provisioner is no longer needed. + Close() error // Provision provisions a new resource. // It may be called multiple times for the same ID if: // - the initial provision is interrupted, or diff --git a/admin/provisioner/resources.go b/admin/provisioner/resources.go index 62ea26e691f..ab2ba43b39d 100644 --- a/admin/provisioner/resources.go +++ b/admin/provisioner/resources.go @@ -10,7 +10,8 @@ import ( type ResourceType string const ( - ResourceTypeRuntime ResourceType = "runtime" + ResourceTypeRuntime ResourceType = "runtime" + ResourceTypeClickHouse ResourceType = "clickhouse" ) // RuntimeArgs describe the expected arguments for provisioning a runtime resource. @@ -66,3 +67,26 @@ func (r *RuntimeConfig) AsMap() map[string]any { } return res } + +// ClickhouseConfig describes the expected config for a provisioned Clickhouse resource. +type ClickhouseConfig struct { + DSN string `mapstructure:"dsn"` +} + +func NewClickhouseConfig(cfg map[string]any) (*ClickhouseConfig, error) { + res := &ClickhouseConfig{} + err := mapstructure.Decode(cfg, res) + if err != nil { + return nil, fmt.Errorf("failed to parse clickhouse config: %w", err) + } + return res, nil +} + +func (r *ClickhouseConfig) AsMap() map[string]any { + res := make(map[string]any) + err := mapstructure.Decode(r, &res) + if err != nil { + panic(err) + } + return res +} diff --git a/admin/provisioner/static/static.go b/admin/provisioner/static/static.go index b7128c4d72e..f2d276aabd3 100644 --- a/admin/provisioner/static/static.go +++ b/admin/provisioner/static/static.go @@ -62,6 +62,10 @@ func (p *StaticProvisioner) Type() string { return "static" } +func (p *StaticProvisioner) Close() error { + return nil +} + func (p *StaticProvisioner) Provision(ctx context.Context, r *provisioner.Resource, opts *provisioner.ResourceOptions) (*provisioner.Resource, error) { // Can only provision runtime resources if r.Type != provisioner.ResourceTypeRuntime { diff --git a/cli/cmd/admin/start.go b/cli/cmd/admin/start.go index 3aedf25617a..2a4fac28236 100644 --- a/cli/cmd/admin/start.go +++ b/cli/cmd/admin/start.go @@ -36,6 +36,7 @@ import ( // Register database and provisioner implementations _ "github.com/rilldata/rill/admin/database/postgres" + _ "github.com/rilldata/rill/admin/provisioner/clickhousestatic" _ "github.com/rilldata/rill/admin/provisioner/kubernetes" _ "github.com/rilldata/rill/admin/provisioner/static" )