Skip to content

Commit

Permalink
Integrate singleflight with conncache's mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller committed Dec 14, 2023
1 parent 4833bf0 commit fd3d077
Showing 1 changed file with 128 additions and 124 deletions.
252 changes: 128 additions & 124 deletions runtime/pkg/conncache/conncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/hashicorp/golang-lru/simplelru"
"github.com/rilldata/rill/runtime/pkg/singleflight"
)

// Cache is a concurrency-safe cache of stateful connection objects.
Expand Down Expand Up @@ -59,31 +58,31 @@ type Options struct {

var _ Cache = (*cacheImpl)(nil)

// cacheImpl implements Cache.
// It leverages a singleflight to ensure at most one open/close action runs against a connection at a time.
// It also uses an LRU to pool unused connections and eventually close them.
// The implementation heavily depends on implementation details of singleflight.Group.
// Notably, it will in different places invoke the singleflight with different callbacks for the same key.
// It also relies on singleflight.Do always invoking the callback even if the passed ctx is already cancelled.
// cacheImpl implements Cache. Implementation notes:
// - It uses an LRU to pool unused connections and eventually close them.
// - It leverages a singleflight pattern to ensure at most one open/close action runs against a connection at a time.
// - An entry will only have entryStatusOpening or entryStatusClosing if a singleflight call is currently running for it.
// - Any code that keeps a reference to an entry after the mutex is released must call retainEntry/releaseEntry.
// - If the ctx for an open call is cancelled, the entry will continue opening in the background (and will be put in the LRU).
// - If attempting to open a closing entry, or close an opening entry, we wait for the singleflight to complete and then retry once. To avoid infinite loops, we don't retry more than once.
type cacheImpl struct {
opts Options
closed bool
mu sync.Mutex
entries map[string]*entry
lru *simplelru.LRU
singleflight *singleflight.Group[string, entryStatus]
singleflight map[string]chan struct{}
ctx context.Context
cancel context.CancelFunc
}

type entry struct {
cfg any
refs int
status entryStatus
since time.Time
closedCh chan struct{} // Not set for regular evictions, only used when Cache.Close() is called.
handle Connection
err error
cfg any
refs int
status entryStatus
since time.Time
handle Connection
err error
}

type entryStatus int
Expand All @@ -101,7 +100,7 @@ func New(opts Options) Cache {
c := &cacheImpl{
opts: opts,
entries: make(map[string]*entry),
singleflight: &singleflight.Group[string, entryStatus]{},
singleflight: make(map[string]chan struct{}),
ctx: ctx,
cancel: cancel,
}
Expand Down Expand Up @@ -145,81 +144,87 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
return e.handle, c.releaseFunc(k, e), nil
}

c.mu.Unlock()
ch, ok := c.singleflight[k]

// We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry.
// We need to retry once since the singleflight might currently be closing the same entry.
// We don't retry more than once to avoid potentially infinite open/close loops.
for attempt := 0; attempt < 2; attempt++ {
stat, err := c.singleflight.Do(ctx, k, func(_ context.Context) (entryStatus, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return entryStatusUnspecified, errors.New("conncache: closed")
}
if e.status == entryStatusOpen {
c.mu.Unlock()
return entryStatusOpen, nil
}
c.retainEntry(k, e)
e.status = entryStatusOpening
e.since = time.Now()
e.handle = nil
e.err = nil
if ok && e.status == entryStatusClosing {
c.mu.Unlock()
<-ch
c.mu.Lock()

// Since we released the lock, need to check c.closed and e.status again.
if c.closed {
c.releaseEntry(k, e)
c.mu.Unlock()
return nil, nil, errors.New("conncache: closed")
}

if e.status == entryStatusOpen {
defer c.mu.Unlock()
if e.err != nil {
c.releaseEntry(k, e)
return nil, nil, e.err
}
return e.handle, c.releaseFunc(k, e), nil
}

ch, ok = c.singleflight[k]
}

if !ok {
c.retainEntry(k, e) // Retain again to count the goroutine's reference independently (in case ctx is cancelled while the Open continues in the background)

ch = make(chan struct{})
c.singleflight[k] = ch

e.status = entryStatusOpening
e.since = time.Now()
e.handle = nil
e.err = nil

go func() {
var handle Connection
var err error
if c.opts.OpenTimeout == 0 {
handle, err = c.opts.OpenFunc(ctx, cfg)
handle, err = c.opts.OpenFunc(c.ctx, cfg)
} else {
ctx, cancel := context.WithTimeout(c.ctx, c.opts.OpenTimeout)
handle, err = c.opts.OpenFunc(ctx, cfg)
cancel()
}

c.mu.Lock()
c.releaseEntry(k, e)
defer c.mu.Unlock()

e.status = entryStatusOpen
e.since = time.Now()
e.handle = handle
e.err = err
c.mu.Unlock()

return entryStatusOpen, nil
})
if err != nil {
// TODO: could be a caught panic. Should we handle panics?
return nil, nil, err
}

if stat != entryStatusOpen {
// TODO: Too fast
continue
}
delete(c.singleflight, k)
close(ch)

c.mu.Lock()
if e.status != entryStatusOpen {
c.releaseEntry(k, e)
c.mu.Unlock()
return nil, nil, errors.New("conncache: connection was immediately closed after being opened")
}
handle := e.handle
err = e.err
if e.err != nil {
c.releaseEntry(k, e)
c.mu.Unlock()
return nil, nil, err
}
c.mu.Unlock()
return handle, c.releaseFunc(k, e), nil
}()
}

c.mu.Lock()
c.releaseEntry(k, e)
c.mu.Unlock()

return nil, nil, errors.New("conncache: connection was closed repeatedly while trying to open it")
<-ch

c.mu.Lock()
defer c.mu.Unlock()

if e.status != entryStatusOpen {
c.releaseEntry(k, e)
return nil, nil, errors.New("conncache: connection was immediately closed after being opened")
}

if e.err != nil {
c.releaseEntry(k, e)
return nil, nil, e.err
}

return e.handle, c.releaseFunc(k, e), nil
}

func (c *cacheImpl) EvictWhere(predicate func(cfg any) bool) {
Expand Down Expand Up @@ -249,32 +254,26 @@ func (c *cacheImpl) Close(ctx context.Context) error {
c.mu.Unlock()

for {
if ctx.Err() != nil {
return ctx.Err()
}

c.mu.Lock()
var anyE *entry
for _, e := range c.entries {
if e.status != entryStatusClosed {
anyE = e
break
}
var anyCh chan struct{}
for _, ch := range c.singleflight {
anyCh = ch
break
}
c.mu.Unlock()

if anyE == nil {
c.mu.Unlock()
if anyCh == nil {
// all entries are closed, we can return
break
return nil
}

anyE.closedCh = make(chan struct{})
c.mu.Unlock()

<-anyE.closedCh
select {
case <-anyCh:
// continue
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

// beginClose must be called while c.mu is held.
Expand All @@ -285,51 +284,55 @@ func (c *cacheImpl) beginClose(k string, e *entry) {

c.retainEntry(k, e)

go func() {
// We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry.
// We need to retry once since the singleflight might currently be opening the same entry.
// We don't retry more than once to avoid potentially infinite open/close loops.
for attempt := 0; attempt < 2; attempt++ {
stat, _ := c.singleflight.Do(context.Background(), k, func(_ context.Context) (entryStatus, error) {
c.mu.Lock()
if e.status == entryStatusClosed {
c.mu.Unlock()
return entryStatusClosed, nil
}
e.status = entryStatusClosing
e.since = time.Now()
c.mu.Unlock()
ch, ok := c.singleflight[k]
if ok {
c.mu.Unlock()
<-ch
c.mu.Lock()

var err error
if e.handle != nil {
err = e.handle.Close()
}
if err == nil {
err = errors.New("conncache: connection closed")
}
_, ok = c.singleflight[k]
if ok {
// Probably, another goroutine started closing it. Very unlikely, it was closed and re-opened again.
// Either way, we did our part.
c.releaseEntry(k, e)
return
}

c.mu.Lock()
e.status = entryStatusClosed
e.since = time.Now()
if e.closedCh != nil {
close(e.closedCh)
}
e.handle = nil
e.err = err
c.mu.Unlock()
// Since we released the lock, need to check e.status again.
// (Doesn't need to check entryStatusClosing since we now know that the singleflight is empty.)
if e.status == entryStatusClosed {
c.releaseEntry(k, e)
return
}
}

return entryStatusClosed, nil
})
// TODO: can return err on panic in Close. Should we handle panics?
ch = make(chan struct{})
c.singleflight[k] = ch

if stat == entryStatusClosed {
break
}
e.status = entryStatusClosing
e.since = time.Now()

go func() {
var err error
if e.handle != nil {
err = e.handle.Close()
}
if err == nil {
err = errors.New("conncache: connection closed")
}

c.mu.Lock()
defer c.mu.Unlock()

e.status = entryStatusClosed
e.since = time.Now()
e.handle = nil
e.err = err

delete(c.singleflight, k)
close(ch)

c.releaseEntry(k, e)
c.mu.Unlock()
}()
}

Expand Down Expand Up @@ -380,7 +383,8 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() {
select {
case <-ticker.C:
c.mu.Lock()
for _, e := range c.entries {
for k := range c.singleflight {
e := c.entries[k]
if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout {
c.opts.HangingFunc(e.cfg, true)
}
Expand Down

0 comments on commit fd3d077

Please sign in to comment.