Skip to content

Commit

Permalink
Fix various race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller committed Dec 14, 2023
1 parent cb7bda4 commit 4833bf0
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 46 deletions.
7 changes: 4 additions & 3 deletions runtime/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ type cachedConnectionConfig struct {
// It also monitors for hanging connections.
func (r *Runtime) newConnectionCache() conncache.Cache {
return conncache.New(conncache.Options{
MaxConnectionsIdle: r.opts.ConnectionCacheSize,
OpenTimeout: 2 * time.Minute,
CloseTimeout: 5 * time.Minute,
MaxConnectionsIdle: r.opts.ConnectionCacheSize,
OpenTimeout: 2 * time.Minute,
CloseTimeout: 5 * time.Minute,
CheckHangingInterval: time.Minute,
OpenFunc: func(ctx context.Context, cfg any) (conncache.Connection, error) {
x := cfg.(cachedConnectionConfig)
return r.openAndMigrate(ctx, x)
Expand Down
72 changes: 45 additions & 27 deletions runtime/pkg/conncache/conncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Options struct {
OpenTimeout time.Duration
// CloseTimeout is the maximum amount of time to wait for a connection to close.
CloseTimeout time.Duration
// CheckHangingInterval is the interval at which to check for hanging open/close calls.
CheckHangingInterval time.Duration
// OpenFunc opens a connection.
OpenFunc func(ctx context.Context, cfg any) (Connection, error)
// KeyFunc computes a comparable key for a connection configuration.
Expand All @@ -69,7 +71,7 @@ type cacheImpl struct {
mu sync.Mutex
entries map[string]*entry
lru *simplelru.LRU
singleflight *singleflight.Group[string, any]
singleflight *singleflight.Group[string, entryStatus]
ctx context.Context
cancel context.CancelFunc
}
Expand Down Expand Up @@ -99,7 +101,7 @@ func New(opts Options) Cache {
c := &cacheImpl{
opts: opts,
entries: make(map[string]*entry),
singleflight: &singleflight.Group[string, any]{},
singleflight: &singleflight.Group[string, entryStatus]{},
ctx: ctx,
cancel: cancel,
}
Expand All @@ -110,7 +112,9 @@ func New(opts Options) Cache {
panic(err)
}

go c.periodicallyCheckHangingConnections()
if opts.CheckHangingInterval != 0 {
go c.periodicallyCheckHangingConnections()
}

return c
}
Expand Down Expand Up @@ -143,16 +147,19 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu

c.mu.Unlock()

// We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry.
// We need to retry once since the singleflight might currently be closing the same entry.
// We don't retry more than once to avoid potentially infinite open/close loops.
for attempt := 0; attempt < 2; attempt++ {
_, err := c.singleflight.Do(ctx, k, func(_ context.Context) (any, error) {
stat, err := c.singleflight.Do(ctx, k, func(_ context.Context) (entryStatus, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, errors.New("conncache: closed")
return entryStatusUnspecified, errors.New("conncache: closed")
}
if e.status == entryStatusOpen {
c.mu.Unlock()
return nil, nil
return entryStatusOpen, nil
}
c.retainEntry(k, e)
e.status = entryStatusOpening
Expand All @@ -179,27 +186,40 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
e.err = err
c.mu.Unlock()

return nil, nil
return entryStatusOpen, nil
})
if err != nil {
// TODO: could be a caught panic. Should we handle panics?
return nil, nil, err
}

if stat != entryStatusOpen {
// TODO: Too fast
continue
}

c.mu.Lock()
if e.status == entryStatusOpen {
break
if e.status != entryStatusOpen {
c.releaseEntry(k, e)
c.mu.Unlock()
return nil, nil, errors.New("conncache: connection was immediately closed after being opened")
}
handle := e.handle
err = e.err
if e.err != nil {
c.releaseEntry(k, e)
c.mu.Unlock()
return nil, nil, err
}
c.mu.Unlock()
return handle, c.releaseFunc(k, e), nil
}

defer c.mu.Unlock()
c.mu.Lock()
c.releaseEntry(k, e)
c.mu.Unlock()

if e.err != nil {
c.releaseEntry(k, e)
return nil, nil, e.err
}
return e.handle, c.releaseFunc(k, e), nil
return nil, nil, errors.New("conncache: connection was closed repeatedly while trying to open it")
}

func (c *cacheImpl) EvictWhere(predicate func(cfg any) bool) {
Expand Down Expand Up @@ -266,12 +286,15 @@ func (c *cacheImpl) beginClose(k string, e *entry) {
c.retainEntry(k, e)

go func() {
// We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry.
// We need to retry once since the singleflight might currently be opening the same entry.
// We don't retry more than once to avoid potentially infinite open/close loops.
for attempt := 0; attempt < 2; attempt++ {
_, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (any, error) {
stat, _ := c.singleflight.Do(context.Background(), k, func(_ context.Context) (entryStatus, error) {
c.mu.Lock()
if e.status == entryStatusClosed {
c.mu.Unlock()
return nil, nil
return entryStatusClosed, nil
}
e.status = entryStatusClosing
e.since = time.Now()
Expand All @@ -295,16 +318,13 @@ func (c *cacheImpl) beginClose(k string, e *entry) {
e.err = err
c.mu.Unlock()

return nil, nil
return entryStatusClosed, nil
})
// TODO: can return err on panic in Close. Should we handle panics?

c.mu.Lock()
if e.status == entryStatusClosed {
c.mu.Unlock()
if stat == entryStatusClosed {
break
}
c.mu.Unlock()
}

c.mu.Lock()
Expand Down Expand Up @@ -352,21 +372,19 @@ func (c *cacheImpl) releaseFunc(key string, e *entry) ReleaseFunc {
}
}

var checkHangingInterval = time.Minute

func (c *cacheImpl) periodicallyCheckHangingConnections() {
ticker := time.NewTicker(checkHangingInterval)
ticker := time.NewTicker(c.opts.CheckHangingInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.mu.Lock()
for _, e := range c.entries {
if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) >= c.opts.OpenTimeout {
if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout {
c.opts.HangingFunc(e.cfg, true)
}
if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) >= c.opts.CloseTimeout {
if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) > c.opts.CloseTimeout {
c.opts.HangingFunc(e.cfg, false)
}
}
Expand Down
26 changes: 10 additions & 16 deletions runtime/pkg/conncache/conncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ import (
type mockConn struct {
cfg string
closeDelay time.Duration
closeHang bool
closeCalled bool
closeCalled atomic.Bool
}

func (c *mockConn) Close() error {
c.closeCalled = true
if c.closeHang {
select {}
}
c.closeCalled.Store(true)
time.Sleep(c.closeDelay)
return nil
}
Expand Down Expand Up @@ -71,7 +67,7 @@ func TestBasic(t *testing.T) {
r()
}
time.Sleep(time.Second)
require.Equal(t, true, m1.(*mockConn).closeCalled)
require.Equal(t, true, m1.(*mockConn).closeCalled.Load())

// Close cache
require.NoError(t, c.Close(context.Background()))
Expand Down Expand Up @@ -145,7 +141,7 @@ func TestOpenDuringClose(t *testing.T) {
c.EvictWhere(func(cfg any) bool { return true })
// closeCalled is set before mockConn.Close hangs, but it will take 1s to actually close
time.Sleep(100 * time.Millisecond)
require.True(t, m1.(*mockConn).closeCalled)
require.True(t, m1.(*mockConn).closeCalled.Load())

// Open again, check it takes ~1s to do so
start := time.Now()
Expand Down Expand Up @@ -182,7 +178,7 @@ func TestCloseInUse(t *testing.T) {
// Evict it, check it's closed even though still in use (r1 not called)
c.EvictWhere(func(cfg any) bool { return true })
time.Sleep(time.Second)
require.Equal(t, true, m1.(*mockConn).closeCalled)
require.Equal(t, true, m1.(*mockConn).closeCalled.Load())

// Open "foo" again, check it opens a new one
m2, r2, err := c.Acquire(context.Background(), "foo")
Expand All @@ -196,16 +192,14 @@ func TestCloseInUse(t *testing.T) {
}

func TestHanging(t *testing.T) {
// Make it check for hanging conns every 100ms
checkHangingInterval = 100 * time.Millisecond

hangingOpens := atomic.Int64{}
hangingCloses := atomic.Int64{}

c := New(Options{
MaxConnectionsIdle: 2,
OpenTimeout: 100 * time.Millisecond,
CloseTimeout: 100 * time.Millisecond,
MaxConnectionsIdle: 2,
OpenTimeout: 100 * time.Millisecond,
CloseTimeout: 100 * time.Millisecond,
CheckHangingInterval: 100 * time.Millisecond,
OpenFunc: func(ctx context.Context, cfg any) (Connection, error) {
time.Sleep(time.Second)
return &mockConn{
Expand Down Expand Up @@ -234,6 +228,6 @@ func TestHanging(t *testing.T) {
// Evict it, check it's closed even though still in use (r1 not called)
c.EvictWhere(func(cfg any) bool { return true })
time.Sleep(time.Second)
require.Equal(t, true, m1.(*mockConn).closeCalled)
require.Equal(t, true, m1.(*mockConn).closeCalled.Load())
require.GreaterOrEqual(t, hangingCloses.Load(), int64(1))
}

0 comments on commit 4833bf0

Please sign in to comment.