Skip to content

Commit

Permalink
Make tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller committed Dec 13, 2023
1 parent bff5883 commit 6b38206
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 27 deletions.
87 changes: 61 additions & 26 deletions runtime/pkg/conncache/conncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package conncache

import (
"context"
"fmt"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -55,6 +55,14 @@ type Options struct {
HangingFunc func(cfg any, open bool)
}

var _ Cache = (*cacheImpl)(nil)

// cacheImpl implements Cache.
// It leverages a singleflight to ensure at most one open/close action runs against a connection at a time.
// It also uses an LRU to pool unused connections and eventually close them.
// The implementation heavily depends on implementation details of singleflight.Group.
// Notably, it will in different places invoke the singleflight with different callbacks for the same key.
// It also relies on singleflight.Do always invoking the callback even if the passed ctx is already cancelled.
type cacheImpl struct {
opts Options
closed bool
Expand All @@ -67,20 +75,21 @@ type cacheImpl struct {
}

type entry struct {
cfg any
refs int
status entryStatus
since time.Time
handle Connection
err error
cfg any
refs int
status entryStatus
since time.Time
closedCh chan struct{} // Not set for regular evictions, only used when Cache.Close() is called.
handle Connection
err error
}

type entryStatus int

const (
entryStatusUnspecified entryStatus = iota
entryStatusOpening
entryStatusOpen
entryStatusOpen // Also used for cases where open errored (i.e. entry.err != nil)
entryStatusClosing
entryStatusClosed
)
Expand Down Expand Up @@ -112,7 +121,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, nil, fmt.Errorf("conncache: closed")
return nil, nil, errors.New("conncache: closed")
}

e, ok := c.entries[k]
Expand All @@ -126,6 +135,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
if e.status == entryStatusOpen {
defer c.mu.Unlock()
if e.err != nil {
c.releaseEntry(k, e)
return nil, nil, e.err
}
return e.handle, c.releaseFunc(k, e), nil
Expand All @@ -136,6 +146,14 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
for attempt := 0; attempt < 2; attempt++ {
_, err := c.singleflight.Do(ctx, k, func(_ context.Context) (any, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, errors.New("conncache: closed")
}
if e.status == entryStatusOpen {
c.mu.Unlock()
return nil, nil
}
c.retainEntry(k, e)
e.status = entryStatusOpening
e.since = time.Now()
Expand Down Expand Up @@ -164,7 +182,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
return nil, nil
})
if err != nil {
// TODO: if err is not ctx.Err(), it's a panic. Should we handle panics?
// TODO: could be a caught panic. Should we handle panics?
return nil, nil, err
}

Expand All @@ -178,6 +196,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu
defer c.mu.Unlock()

if e.err != nil {
c.releaseEntry(k, e)
return nil, nil, e.err
}
return e.handle, c.releaseFunc(k, e), nil
Expand All @@ -197,7 +216,7 @@ func (c *cacheImpl) Close(ctx context.Context) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return fmt.Errorf("conncache: already closed")
return errors.New("conncache: already closed")
}
c.closed = true

Expand All @@ -207,38 +226,40 @@ func (c *cacheImpl) Close(ctx context.Context) error {
c.beginClose(k, e)
}

// TODO: Purge? I don't think so.

c.mu.Unlock()

for {
if ctx.Err() != nil {
return ctx.Err()
}

c.mu.Lock()
var anyK string
var anyE *entry
for k, e := range c.entries {
anyK = k
anyE = e
break
for _, e := range c.entries {
if e.status != entryStatusClosed {
anyE = e
break
}
}
c.mu.Unlock()

if anyE == nil {
// c.entries is empty, we can return
c.mu.Unlock()
// all entries are closed, we can return
break
}

// TODO: What if this blocks before the close? Probably better to wait for a close channel on the entry.
_, _ = c.singleflight.Do(context.Background(), anyK, func(_ context.Context) (any, error) {
return nil, nil
})
anyE.closedCh = make(chan struct{})
c.mu.Unlock()

<-anyE.closedCh
}

return nil
}

// beginClose must be called while c.mu is held.
func (c *cacheImpl) beginClose(k string, e *entry) {
if e.status != entryStatusOpening && e.status != entryStatusOpen {
if e.status == entryStatusClosing || e.status == entryStatusClosed {
return
}

Expand All @@ -248,15 +269,28 @@ func (c *cacheImpl) beginClose(k string, e *entry) {
for attempt := 0; attempt < 2; attempt++ {
_, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (any, error) {
c.mu.Lock()
if e.status == entryStatusClosed {
c.mu.Unlock()
return nil, nil
}
e.status = entryStatusClosing
e.since = time.Now()
c.mu.Unlock()

err := e.handle.Close()
var err error
if e.handle != nil {
err = e.handle.Close()
}
if err == nil {
err = errors.New("conncache: connection closed")
}

c.mu.Lock()
e.status = entryStatusClosed
e.since = time.Now()
if e.closedCh != nil {
close(e.closedCh)
}
e.handle = nil
e.err = err
c.mu.Unlock()
Expand All @@ -267,6 +301,7 @@ func (c *cacheImpl) beginClose(k string, e *entry) {

c.mu.Lock()
if e.status == entryStatusClosed {
c.mu.Unlock()
break
}
c.mu.Unlock()
Expand Down
4 changes: 3 additions & 1 deletion runtime/pkg/conncache/conncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestBasic(t *testing.T) {
require.Equal(t, int64(1+i+1), opens.Load())
r()
}
time.Sleep(time.Second)
require.Equal(t, true, m1.(*mockConn).closeCalled)

// Close cache
Expand Down Expand Up @@ -142,7 +143,8 @@ func TestOpenDuringClose(t *testing.T) {

// Evict it so it starts closing
c.EvictWhere(func(cfg any) bool { return true })
// closeCalled is set immediately, but it will take 1s to actually close
// closeCalled is set before mockConn.Close hangs, but it will take 1s to actually close
time.Sleep(100 * time.Millisecond)
require.True(t, m1.(*mockConn).closeCalled)

// Open again, check it takes ~1s to do so
Expand Down

0 comments on commit 6b38206

Please sign in to comment.