From fd3d077d5fb20fb3d6a6935f2155a60dc93a3f2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Thu, 14 Dec 2023 16:08:53 +0100 Subject: [PATCH] Integrate singleflight with conncache's mutex --- runtime/pkg/conncache/conncache.go | 252 +++++++++++++++-------------- 1 file changed, 128 insertions(+), 124 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index a939e5148c2..0f53ac25497 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -7,7 +7,6 @@ import ( "time" "github.com/hashicorp/golang-lru/simplelru" - "github.com/rilldata/rill/runtime/pkg/singleflight" ) // Cache is a concurrency-safe cache of stateful connection objects. @@ -59,31 +58,31 @@ type Options struct { var _ Cache = (*cacheImpl)(nil) -// cacheImpl implements Cache. -// It leverages a singleflight to ensure at most one open/close action runs against a connection at a time. -// It also uses an LRU to pool unused connections and eventually close them. -// The implementation heavily depends on implementation details of singleflight.Group. -// Notably, it will in different places invoke the singleflight with different callbacks for the same key. -// It also relies on singleflight.Do always invoking the callback even if the passed ctx is already cancelled. +// cacheImpl implements Cache. Implementation notes: +// - It uses an LRU to pool unused connections and eventually close them. +// - It leverages a singleflight pattern to ensure at most one open/close action runs against a connection at a time. +// - An entry will only have entryStatusOpening or entryStatusClosing if a singleflight call is currently running for it. +// - Any code that keeps a reference to an entry after the mutex is released must call retainEntry/releaseEntry. +// - If the ctx for an open call is cancelled, the entry will continue opening in the background (and will be put in the LRU). +// - If attempting to open a closing entry, or close an opening entry, we wait for the singleflight to complete and then retry once. To avoid infinite loops, we don't retry more than once. type cacheImpl struct { opts Options closed bool mu sync.Mutex entries map[string]*entry lru *simplelru.LRU - singleflight *singleflight.Group[string, entryStatus] + singleflight map[string]chan struct{} ctx context.Context cancel context.CancelFunc } type entry struct { - cfg any - refs int - status entryStatus - since time.Time - closedCh chan struct{} // Not set for regular evictions, only used when Cache.Close() is called. - handle Connection - err error + cfg any + refs int + status entryStatus + since time.Time + handle Connection + err error } type entryStatus int @@ -101,7 +100,7 @@ func New(opts Options) Cache { c := &cacheImpl{ opts: opts, entries: make(map[string]*entry), - singleflight: &singleflight.Group[string, entryStatus]{}, + singleflight: make(map[string]chan struct{}), ctx: ctx, cancel: cancel, } @@ -145,33 +144,48 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu return e.handle, c.releaseFunc(k, e), nil } - c.mu.Unlock() + ch, ok := c.singleflight[k] - // We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry. - // We need to retry once since the singleflight might currently be closing the same entry. - // We don't retry more than once to avoid potentially infinite open/close loops. - for attempt := 0; attempt < 2; attempt++ { - stat, err := c.singleflight.Do(ctx, k, func(_ context.Context) (entryStatus, error) { - c.mu.Lock() - if c.closed { - c.mu.Unlock() - return entryStatusUnspecified, errors.New("conncache: closed") - } - if e.status == entryStatusOpen { - c.mu.Unlock() - return entryStatusOpen, nil - } - c.retainEntry(k, e) - e.status = entryStatusOpening - e.since = time.Now() - e.handle = nil - e.err = nil + if ok && e.status == entryStatusClosing { + c.mu.Unlock() + <-ch + c.mu.Lock() + + // Since we released the lock, need to check c.closed and e.status again. + if c.closed { + c.releaseEntry(k, e) c.mu.Unlock() + return nil, nil, errors.New("conncache: closed") + } + if e.status == entryStatusOpen { + defer c.mu.Unlock() + if e.err != nil { + c.releaseEntry(k, e) + return nil, nil, e.err + } + return e.handle, c.releaseFunc(k, e), nil + } + + ch, ok = c.singleflight[k] + } + + if !ok { + c.retainEntry(k, e) // Retain again to count the goroutine's reference independently (in case ctx is cancelled while the Open continues in the background) + + ch = make(chan struct{}) + c.singleflight[k] = ch + + e.status = entryStatusOpening + e.since = time.Now() + e.handle = nil + e.err = nil + + go func() { var handle Connection var err error if c.opts.OpenTimeout == 0 { - handle, err = c.opts.OpenFunc(ctx, cfg) + handle, err = c.opts.OpenFunc(c.ctx, cfg) } else { ctx, cancel := context.WithTimeout(c.ctx, c.opts.OpenTimeout) handle, err = c.opts.OpenFunc(ctx, cfg) @@ -179,47 +193,38 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu } c.mu.Lock() - c.releaseEntry(k, e) + defer c.mu.Unlock() + e.status = entryStatusOpen e.since = time.Now() e.handle = handle e.err = err - c.mu.Unlock() - return entryStatusOpen, nil - }) - if err != nil { - // TODO: could be a caught panic. Should we handle panics? - return nil, nil, err - } - - if stat != entryStatusOpen { - // TODO: Too fast - continue - } + delete(c.singleflight, k) + close(ch) - c.mu.Lock() - if e.status != entryStatusOpen { - c.releaseEntry(k, e) - c.mu.Unlock() - return nil, nil, errors.New("conncache: connection was immediately closed after being opened") - } - handle := e.handle - err = e.err - if e.err != nil { c.releaseEntry(k, e) - c.mu.Unlock() - return nil, nil, err - } - c.mu.Unlock() - return handle, c.releaseFunc(k, e), nil + }() } - c.mu.Lock() - c.releaseEntry(k, e) c.mu.Unlock() - return nil, nil, errors.New("conncache: connection was closed repeatedly while trying to open it") + <-ch + + c.mu.Lock() + defer c.mu.Unlock() + + if e.status != entryStatusOpen { + c.releaseEntry(k, e) + return nil, nil, errors.New("conncache: connection was immediately closed after being opened") + } + + if e.err != nil { + c.releaseEntry(k, e) + return nil, nil, e.err + } + + return e.handle, c.releaseFunc(k, e), nil } func (c *cacheImpl) EvictWhere(predicate func(cfg any) bool) { @@ -249,32 +254,26 @@ func (c *cacheImpl) Close(ctx context.Context) error { c.mu.Unlock() for { - if ctx.Err() != nil { - return ctx.Err() - } - c.mu.Lock() - var anyE *entry - for _, e := range c.entries { - if e.status != entryStatusClosed { - anyE = e - break - } + var anyCh chan struct{} + for _, ch := range c.singleflight { + anyCh = ch + break } + c.mu.Unlock() - if anyE == nil { - c.mu.Unlock() + if anyCh == nil { // all entries are closed, we can return - break + return nil } - anyE.closedCh = make(chan struct{}) - c.mu.Unlock() - - <-anyE.closedCh + select { + case <-anyCh: + // continue + case <-ctx.Done(): + return ctx.Err() + } } - - return nil } // beginClose must be called while c.mu is held. @@ -285,51 +284,55 @@ func (c *cacheImpl) beginClose(k string, e *entry) { c.retainEntry(k, e) - go func() { - // We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry. - // We need to retry once since the singleflight might currently be opening the same entry. - // We don't retry more than once to avoid potentially infinite open/close loops. - for attempt := 0; attempt < 2; attempt++ { - stat, _ := c.singleflight.Do(context.Background(), k, func(_ context.Context) (entryStatus, error) { - c.mu.Lock() - if e.status == entryStatusClosed { - c.mu.Unlock() - return entryStatusClosed, nil - } - e.status = entryStatusClosing - e.since = time.Now() - c.mu.Unlock() + ch, ok := c.singleflight[k] + if ok { + c.mu.Unlock() + <-ch + c.mu.Lock() - var err error - if e.handle != nil { - err = e.handle.Close() - } - if err == nil { - err = errors.New("conncache: connection closed") - } + _, ok = c.singleflight[k] + if ok { + // Probably, another goroutine started closing it. Very unlikely, it was closed and re-opened again. + // Either way, we did our part. + c.releaseEntry(k, e) + return + } - c.mu.Lock() - e.status = entryStatusClosed - e.since = time.Now() - if e.closedCh != nil { - close(e.closedCh) - } - e.handle = nil - e.err = err - c.mu.Unlock() + // Since we released the lock, need to check e.status again. + // (Doesn't need to check entryStatusClosing since we now know that the singleflight is empty.) + if e.status == entryStatusClosed { + c.releaseEntry(k, e) + return + } + } - return entryStatusClosed, nil - }) - // TODO: can return err on panic in Close. Should we handle panics? + ch = make(chan struct{}) + c.singleflight[k] = ch - if stat == entryStatusClosed { - break - } + e.status = entryStatusClosing + e.since = time.Now() + + go func() { + var err error + if e.handle != nil { + err = e.handle.Close() + } + if err == nil { + err = errors.New("conncache: connection closed") } c.mu.Lock() + defer c.mu.Unlock() + + e.status = entryStatusClosed + e.since = time.Now() + e.handle = nil + e.err = err + + delete(c.singleflight, k) + close(ch) + c.releaseEntry(k, e) - c.mu.Unlock() }() } @@ -380,7 +383,8 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() { select { case <-ticker.C: c.mu.Lock() - for _, e := range c.entries { + for k := range c.singleflight { + e := c.entries[k] if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout { c.opts.HangingFunc(e.cfg, true) }