Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: Refactor conn cache to contain and detect hanging opens/closes #3666

Merged
329 changes: 58 additions & 271 deletions runtime/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,246 +6,81 @@ import (
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/hashicorp/golang-lru/simplelru"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/observability"
"github.com/rilldata/rill/runtime/pkg/conncache"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

var errConnectionCacheClosed = errors.New("connectionCache: closed")

const migrateTimeout = 2 * time.Minute

// connectionCache is a thread-safe cache for open connections.
// Connections should preferably be opened only via the connection cache.
//
// TODO: It opens connections async, but it will close them sync when evicted. If a handle's close hangs, this can block the cache.
// We should move the closing to the background. However, it must then handle the case of trying to re-open a connection that's currently closing in the background.
type connectionCache struct {
size int
runtime *Runtime
logger *zap.Logger
activity activity.Client
closed bool
migrateCtx context.Context // ctx used for connection migrations
migrateCtxCancel context.CancelFunc // cancel all running migrations
lock sync.Mutex
acquired map[string]*connWithRef // items with non-zero references (in use) which should not be evicted
lru *simplelru.LRU // items with no references (opened, but not in use) ready for eviction
}

type connWithRef struct {
type cachedConnectionConfig struct {
instanceID string
handle drivers.Handle
err error
refs int
ready chan struct{}
driver string
shared bool
config map[string]any
}

func newConnectionCache(size int, logger *zap.Logger, rt *Runtime, ac activity.Client) *connectionCache {
// LRU cache that closes evicted connections
lru, err := simplelru.NewLRU(size, func(key interface{}, value interface{}) {
// Skip if the conn has refs, since the callback also gets called when transferring to acquired cache
conn := value.(*connWithRef)
if conn.refs != 0 {
return
}
if conn.handle != nil {
if err := conn.handle.Close(); err != nil {
logger.Error("failed closing cached connection", zap.String("key", key.(string)), zap.Error(err))
}
}
// newConnectionCache returns a concurrency-safe cache for open connections.
// Connections should preferably be opened only via the connection cache.
// It's implementation handles issues such as concurrent open/close/eviction of a connection.
// It also monitors for hanging connections.
func (r *Runtime) newConnectionCache() conncache.Cache {
return conncache.New(conncache.Options{
MaxConnectionsIdle: r.opts.ConnectionCacheSize,
OpenTimeout: 10 * time.Minute,
CloseTimeout: 10 * time.Minute,
CheckHangingInterval: time.Minute,
OpenFunc: func(ctx context.Context, cfg any) (conncache.Connection, error) {
x := cfg.(cachedConnectionConfig)
return r.openAndMigrate(ctx, x)
},
KeyFunc: func(cfg any) string {
x := cfg.(cachedConnectionConfig)
return generateKey(x)
},
HangingFunc: func(cfg any, open bool) {
x := cfg.(cachedConnectionConfig)
r.logger.Error("connection cache: connection has been working for too long", zap.String("instance_id", x.instanceID), zap.String("driver", x.driver), zap.Bool("open", open))
},
})
if err != nil {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())
return &connectionCache{
size: size,
runtime: rt,
logger: logger,
activity: ac,
migrateCtx: ctx,
migrateCtxCancel: cancel,
acquired: make(map[string]*connWithRef),
lru: lru,
}
}

func (c *connectionCache) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.closed {
return errConnectionCacheClosed
}
c.closed = true

// Cancel currently running migrations
c.migrateCtxCancel()

var firstErr error
for _, key := range c.lru.Keys() {
val, ok := c.lru.Get(key)
if !ok {
continue
}
conn := val.(*connWithRef)
if conn.handle == nil {
continue
}
err := conn.handle.Close()
if err != nil {
c.logger.Error("failed closing cached connection", zap.Error(err))
if firstErr == nil {
firstErr = err
}
}
}

for _, value := range c.acquired {
if value.handle == nil {
continue
}
err := value.handle.Close()
if err != nil {
c.logger.Error("failed closing cached connection", zap.Error(err))
if firstErr == nil {
firstErr = err
}
}
}

return firstErr
}

func (c *connectionCache) get(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) {
var key string
if shared {
// not using instanceID to ensure all instances share the same handle
key = driver + generateKey(config)
} else {
key = instanceID + driver + generateKey(config)
}

c.lock.Lock()
if c.closed {
c.lock.Unlock()
return nil, nil, errConnectionCacheClosed
}

// Get conn from caches
conn, ok := c.acquired[key]
if ok {
conn.refs++
} else {
var val any
val, ok = c.lru.Get(key)
if ok {
// Conn was found in LRU - move to acquired cache
conn = val.(*connWithRef)
conn.refs++ // NOTE: Must increment before call to c.lru.remove to avoid closing the conn
c.lru.Remove(key)
c.acquired[key] = conn
}
}

// Cached conn not found, open a new one
if !ok {
conn = &connWithRef{
instanceID: instanceID,
refs: 1, // Since refs is assumed to already have been incremented when checking conn.ready
ready: make(chan struct{}),
}
c.acquired[key] = conn

if len(c.acquired)+c.lru.Len() > c.size {
c.logger.Warn("number of connections acquired and in LRU exceed total configured size", zap.Int("acquired", len(c.acquired)), zap.Int("lru", c.lru.Len()))
}

// Open and migrate the connection in a separate goroutine (outside lock).
// Incrementing ref and releasing the conn for this operation separately to cover the case where all waiting goroutines are cancelled before the migration completes.
conn.refs++
go func() {
handle, err := c.openAndMigrate(c.migrateCtx, instanceID, driver, shared, config)
c.lock.Lock()
conn.handle = handle
conn.err = err
c.releaseConn(key, conn)
wasClosed := c.closed
c.lock.Unlock()
close(conn.ready)

// The cache might have been closed while the connection was being opened.
// Since we acquired the lock, the close will have already been completed, so we need to close the connection here.
if wasClosed && handle != nil {
_ = handle.Close()
}
}()
}

// We can now release the lock and wait for the connection to be ready (it might already be)
c.lock.Unlock()

// Wait for connection to be ready or context to be cancelled
var err error
select {
case <-conn.ready:
case <-ctx.Done():
err = ctx.Err() // Will always be non-nil, ensuring releaseConn is called
}

// Lock again for accessing conn
c.lock.Lock()
defer c.lock.Unlock()

if err == nil {
err = conn.err
// getConnection returns a cached connection for the given driver configuration.
func (r *Runtime) getConnection(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) {
cfg := cachedConnectionConfig{
instanceID: instanceID,
driver: driver,
shared: shared,
config: config,
}

handle, release, err := r.connCache.Acquire(ctx, cfg)
if err != nil {
c.releaseConn(key, conn)
return nil, nil, err
}

release := func() {
c.lock.Lock()
c.releaseConn(key, conn)
c.lock.Unlock()
}

return conn.handle, release, nil
return handle.(drivers.Handle), release, nil
}

func (c *connectionCache) releaseConn(key string, conn *connWithRef) {
conn.refs--
if conn.refs == 0 {
// No longer referenced. Move from acquired to LRU.
if !c.closed {
delete(c.acquired, key)
c.lru.Add(key, conn)
}
}
// evictInstanceConnections evicts all connections for the given instance.
func (r *Runtime) evictInstanceConnections(instanceID string) {
r.connCache.EvictWhere(func(cfg any) bool {
x := cfg.(cachedConnectionConfig)
return x.instanceID == instanceID
})
}

func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver string, shared bool, config map[string]any) (drivers.Handle, error) {
logger := c.logger
if instanceID != "default" {
logger = c.logger.With(zap.String("instance_id", instanceID), zap.String("driver", driver))
// openAndMigrate opens a connection and migrates it.
func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig) (drivers.Handle, error) {
logger := r.logger
if cfg.instanceID != "default" {
logger = r.logger.With(zap.String("instance_id", cfg.instanceID), zap.String("driver", cfg.driver))
}

ctx, cancel := context.WithTimeout(ctx, migrateTimeout)
defer cancel()

activityClient := c.activity
if !shared {
inst, err := c.runtime.Instance(ctx, instanceID)
activityClient := r.activity
if !cfg.shared {
inst, err := r.Instance(ctx, cfg.instanceID)
if err != nil {
return nil, err
}
Expand All @@ -256,9 +91,9 @@ func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver
}
}

handle, err := drivers.Open(driver, config, shared, activityClient, logger)
handle, err := drivers.Open(cfg.driver, cfg.config, cfg.shared, activityClient, logger)
if err == nil && ctx.Err() != nil {
err = fmt.Errorf("timed out while opening driver %q", driver)
err = fmt.Errorf("timed out while opening driver %q", cfg.driver)
}
if err != nil {
return nil, err
Expand All @@ -268,71 +103,23 @@ func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver
if err != nil {
handle.Close()
if errors.Is(err, ctx.Err()) {
err = fmt.Errorf("timed out while migrating driver %q: %w", driver, err)
err = fmt.Errorf("timed out while migrating driver %q: %w", cfg.driver, err)
}
return nil, err
}
return handle, nil
}

// evictAll closes all connections for an instance.
func (c *connectionCache) evictAll(ctx context.Context, instanceID string) {
c.lock.Lock()
defer c.lock.Unlock()

if c.closed {
return
}

for key, conn := range c.acquired {
if conn.instanceID != instanceID {
continue
}

if conn.handle != nil {
err := conn.handle.Close()
if err != nil {
c.logger.Error("connection cache: failed to close cached connection", zap.Error(err), zap.String("instance", instanceID), observability.ZapCtx(ctx))
}
conn.handle = nil
conn.err = fmt.Errorf("connection evicted") // Defensive, should never be accessed
}

delete(c.acquired, key)
}

for _, key := range c.lru.Keys() {
connT, ok := c.lru.Get(key)
if !ok {
panic("connection cache: key not found in LRU")
}
conn := connT.(*connWithRef)

if conn.instanceID != instanceID {
continue
}

if conn.handle != nil {
err := conn.handle.Close()
if err != nil {
c.logger.Error("connection cache: failed to close cached connection", zap.Error(err), zap.String("instance", instanceID), observability.ZapCtx(ctx))
}
conn.handle = nil
conn.err = fmt.Errorf("connection evicted") // Defensive, should never be accessed
}

c.lru.Remove(key)
}
}

func generateKey(m map[string]any) string {
func generateKey(cfg cachedConnectionConfig) string {
sb := strings.Builder{}
keys := maps.Keys(m)
sb.WriteString(cfg.instanceID) // Empty if cfg.shared
sb.WriteString(cfg.driver)
keys := maps.Keys(cfg.config)
slices.Sort(keys)
for _, key := range keys {
sb.WriteString(key)
sb.WriteString(":")
sb.WriteString(fmt.Sprint(m[key]))
sb.WriteString(fmt.Sprint(cfg.config[key]))
sb.WriteString(" ")
}
return sb.String()
Expand Down
Loading