From 6b382069262ead3ffc84bee91ccd91690812bae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Wed, 13 Dec 2023 21:57:34 +0100 Subject: [PATCH] Make tests pass --- runtime/pkg/conncache/conncache.go | 87 +++++++++++++++++-------- runtime/pkg/conncache/conncache_test.go | 4 +- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index ff76b706546..26498e082bc 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -2,7 +2,7 @@ package conncache import ( "context" - "fmt" + "errors" "sync" "time" @@ -55,6 +55,14 @@ type Options struct { HangingFunc func(cfg any, open bool) } +var _ Cache = (*cacheImpl)(nil) + +// cacheImpl implements Cache. +// It leverages a singleflight to ensure at most one open/close action runs against a connection at a time. +// It also uses an LRU to pool unused connections and eventually close them. +// The implementation heavily depends on implementation details of singleflight.Group. +// Notably, it will in different places invoke the singleflight with different callbacks for the same key. +// It also relies on singleflight.Do always invoking the callback even if the passed ctx is already cancelled. type cacheImpl struct { opts Options closed bool @@ -67,12 +75,13 @@ type cacheImpl struct { } type entry struct { - cfg any - refs int - status entryStatus - since time.Time - handle Connection - err error + cfg any + refs int + status entryStatus + since time.Time + closedCh chan struct{} // Not set for regular evictions, only used when Cache.Close() is called. + handle Connection + err error } type entryStatus int @@ -80,7 +89,7 @@ type entryStatus int const ( entryStatusUnspecified entryStatus = iota entryStatusOpening - entryStatusOpen + entryStatusOpen // Also used for cases where open errored (i.e. entry.err != nil) entryStatusClosing entryStatusClosed ) @@ -112,7 +121,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu c.mu.Lock() if c.closed { c.mu.Unlock() - return nil, nil, fmt.Errorf("conncache: closed") + return nil, nil, errors.New("conncache: closed") } e, ok := c.entries[k] @@ -126,6 +135,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu if e.status == entryStatusOpen { defer c.mu.Unlock() if e.err != nil { + c.releaseEntry(k, e) return nil, nil, e.err } return e.handle, c.releaseFunc(k, e), nil @@ -136,6 +146,14 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu for attempt := 0; attempt < 2; attempt++ { _, err := c.singleflight.Do(ctx, k, func(_ context.Context) (any, error) { c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil, errors.New("conncache: closed") + } + if e.status == entryStatusOpen { + c.mu.Unlock() + return nil, nil + } c.retainEntry(k, e) e.status = entryStatusOpening e.since = time.Now() @@ -164,7 +182,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu return nil, nil }) if err != nil { - // TODO: if err is not ctx.Err(), it's a panic. Should we handle panics? + // TODO: could be a caught panic. Should we handle panics? return nil, nil, err } @@ -178,6 +196,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu defer c.mu.Unlock() if e.err != nil { + c.releaseEntry(k, e) return nil, nil, e.err } return e.handle, c.releaseFunc(k, e), nil @@ -197,7 +216,7 @@ func (c *cacheImpl) Close(ctx context.Context) error { c.mu.Lock() if c.closed { c.mu.Unlock() - return fmt.Errorf("conncache: already closed") + return errors.New("conncache: already closed") } c.closed = true @@ -207,30 +226,32 @@ func (c *cacheImpl) Close(ctx context.Context) error { c.beginClose(k, e) } - // TODO: Purge? I don't think so. - c.mu.Unlock() for { + if ctx.Err() != nil { + return ctx.Err() + } + c.mu.Lock() - var anyK string var anyE *entry - for k, e := range c.entries { - anyK = k - anyE = e - break + for _, e := range c.entries { + if e.status != entryStatusClosed { + anyE = e + break + } } - c.mu.Unlock() if anyE == nil { - // c.entries is empty, we can return + c.mu.Unlock() + // all entries are closed, we can return break } - // TODO: What if this blocks before the close? Probably better to wait for a close channel on the entry. - _, _ = c.singleflight.Do(context.Background(), anyK, func(_ context.Context) (any, error) { - return nil, nil - }) + anyE.closedCh = make(chan struct{}) + c.mu.Unlock() + + <-anyE.closedCh } return nil @@ -238,7 +259,7 @@ func (c *cacheImpl) Close(ctx context.Context) error { // beginClose must be called while c.mu is held. func (c *cacheImpl) beginClose(k string, e *entry) { - if e.status != entryStatusOpening && e.status != entryStatusOpen { + if e.status == entryStatusClosing || e.status == entryStatusClosed { return } @@ -248,15 +269,28 @@ func (c *cacheImpl) beginClose(k string, e *entry) { for attempt := 0; attempt < 2; attempt++ { _, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (any, error) { c.mu.Lock() + if e.status == entryStatusClosed { + c.mu.Unlock() + return nil, nil + } e.status = entryStatusClosing e.since = time.Now() c.mu.Unlock() - err := e.handle.Close() + var err error + if e.handle != nil { + err = e.handle.Close() + } + if err == nil { + err = errors.New("conncache: connection closed") + } c.mu.Lock() e.status = entryStatusClosed e.since = time.Now() + if e.closedCh != nil { + close(e.closedCh) + } e.handle = nil e.err = err c.mu.Unlock() @@ -267,6 +301,7 @@ func (c *cacheImpl) beginClose(k string, e *entry) { c.mu.Lock() if e.status == entryStatusClosed { + c.mu.Unlock() break } c.mu.Unlock() diff --git a/runtime/pkg/conncache/conncache_test.go b/runtime/pkg/conncache/conncache_test.go index 535675aa8bd..a749d17d12e 100644 --- a/runtime/pkg/conncache/conncache_test.go +++ b/runtime/pkg/conncache/conncache_test.go @@ -70,6 +70,7 @@ func TestBasic(t *testing.T) { require.Equal(t, int64(1+i+1), opens.Load()) r() } + time.Sleep(time.Second) require.Equal(t, true, m1.(*mockConn).closeCalled) // Close cache @@ -142,7 +143,8 @@ func TestOpenDuringClose(t *testing.T) { // Evict it so it starts closing c.EvictWhere(func(cfg any) bool { return true }) - // closeCalled is set immediately, but it will take 1s to actually close + // closeCalled is set before mockConn.Close hangs, but it will take 1s to actually close + time.Sleep(100 * time.Millisecond) require.True(t, m1.(*mockConn).closeCalled) // Open again, check it takes ~1s to do so