Skip to content

Commit

Permalink
Emit ClickHouse database size metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller committed Dec 27, 2024
1 parent e208b54 commit 221b323
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 10 deletions.
4 changes: 4 additions & 0 deletions runtime/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/conncache"
"github.com/rilldata/rill/runtime/pkg/observability"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -102,6 +103,9 @@ func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig
}

activityDims := instanceAnnotationsToAttribs(inst)
if cfg.provision {
activityDims = append(activityDims, attribute.Bool("managed", true))
}
if activityClient != nil {
activityClient = activityClient.With(activityDims...)
}
Expand Down
109 changes: 99 additions & 10 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/XSAM/otelsql"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/rilldata/rill/runtime/storage"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -235,16 +237,25 @@ func (d driver) Open(instanceID string, config map[string]any, st *storage.Clien
return nil, fmt.Errorf("clickhouse version must be 22.7 or higher")
}

conn := &connection{
db: db,
config: conf,
logger: logger,
metaSem: semaphore.NewWeighted(1),
olapSem: priorityqueue.NewSemaphore(maxOpenConnections - 1),
opts: opts,
embed: embed,
ctx, cancel := context.WithCancel(context.Background())
c := &connection{
db: db,
config: conf,
logger: logger,
activity: ac,
instanceID: instanceID,
ctx: ctx,
cancel: cancel,
metaSem: semaphore.NewWeighted(1),
olapSem: priorityqueue.NewSemaphore(maxOpenConnections - 1),
opts: opts,
embed: embed,
}
return conn, nil

c.used()
go c.periodicallyEmitStats(time.Minute)

return c, nil
}

func (d driver) Spec() drivers.Spec {
Expand All @@ -266,6 +277,14 @@ type connection struct {
activity *activity.Client
instanceID string

// context that is cancelled when the connection is closed
ctx context.Context
cancel context.CancelFunc

// lastUsedUnixTime stores the time we last queried the connection.
// This is used to guess if the DB may currently be scaled to zero.
lastUsedUnixTime atomic.Int64

// logic around this copied from duckDB driver
// This driver may issue both OLAP and "meta" queries (like catalog info) against DuckDB.
// Meta queries are usually fast, but OLAP queries may take a long time. To enable predictable parallel performance,
Expand All @@ -284,7 +303,9 @@ type connection struct {

// Ping implements drivers.Handle.
func (c *connection) Ping(ctx context.Context) error {
return c.db.PingContext(ctx)
err := c.db.PingContext(ctx)
c.used()
return err
}

// Driver implements drivers.Connection.
Expand All @@ -301,6 +322,8 @@ func (c *connection) Config() map[string]any {

// Close implements drivers.Connection.
func (c *connection) Close() error {
c.cancel()

errDB := c.db.Close()

var errEmbed error
Expand Down Expand Up @@ -402,3 +425,69 @@ func (c *connection) AsNotifier(properties map[string]any) (drivers.Notifier, er
func (c *connection) AcquireLongRunning(ctx context.Context) (func(), error) {
return nil, fmt.Errorf("not implemented")
}

// used should be called after a query to the database completes.
// It bumps the result of lastUsedOn(), which can be used to guess if the DB may currently be scaled to zero.
//
// Periodic background jobs that rely on lastUsedOn should not call this function since it will lead to the database never scaling to zero.
func (c *connection) used() {
c.lastUsedUnixTime.Store(time.Now().Unix())
}

// lastUsedOn returns the time we last queried the connection.
// This can be used to guess if the DB may currently be scaled to zero.
func (c *connection) lastUsedOn() time.Time {
return time.Unix(c.lastUsedUnixTime.Load(), 0)
}

// Periodically collects stats about the database and emit them as activity events.
func (c *connection) periodicallyEmitStats(d time.Duration) {
if c.activity == nil {
// Activity client isn't set, there is no need to report stats
return
}

ticker := time.NewTicker(d)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Skip if it hasn't been used recently and may be scaled to zero.
if c.config.CanScaleToZero && time.Since(c.lastUsedOn()) > 2*d {
continue
}

// Emit the estimated size of the database.
size, err := c.estimateSize(c.ctx)
if err == nil {
c.activity.RecordMetric(c.ctx, "clickhouse_estimated_size_bytes", float64(size))
} else if !errors.Is(err, c.ctx.Err()) {
c.logger.Error("failed to estimate clickhouse size", zap.Error(err))
}
case <-c.ctx.Done():
return
}
}
}

// estimateSize returns the estimated combined disk size of all resources in the database in bytes.
func (c *connection) estimateSize(ctx context.Context) (int64, error) {
rows, err := c.db.QueryxContext(ctx, `SELECT sum(bytes_on_disk) AS size FROM system.parts WHERE (active = 1) AND lower(database) NOT IN ('information_schema', 'system')`)
if err != nil {
return 0, err
}
defer rows.Close()

var size int64
if rows.Next() {
if err := rows.Scan(&size); err != nil {
return 0, err
}
}
if err := rows.Err(); err != nil {
return 0, err
}

return size, nil
}
1 change: 1 addition & 0 deletions runtime/drivers/clickhouse/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func (c *connection) acquireConn(ctx context.Context) (*sqlx.Conn, func() error,
}

release := func() error {
c.used()
return conn.Close()
}
return conn, release, nil
Expand Down

0 comments on commit 221b323

Please sign in to comment.