Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
retry when readers are full; new default maxreaders: 254. (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored Jan 25, 2021
1 parent 34a1629 commit 169df0e
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 21 deletions.
139 changes: 118 additions & 21 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"math"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand All @@ -31,6 +33,21 @@ var (
// used if the supplied value is zero or invalid. Unless modified, this
// value is 4GiB.
DefaultMmapGrowthStepMax = int64(4 << 30) // maximum step size is 4GiB at a time.

// DefaultMaxReaders is the default number of max readers if one is not
// provided. By default it is 254, not 256, following the note from the LMDB
// author that indicates that the original default was 126 because it fit
// exactly into 8KiB along with a couple of mutexes.
//
// http://www.lmdb.tech/doc/group__readers.html#gadff1f7b4d4626610a8d616e0c6dbbea4
DefaultMaxReaders = 254

// DefaultRetryDelay is the default retry delay for reattempting transactions
// that errored with MDB_READERS_FULL.
DefaultRetryDelay = 10 * time.Millisecond

// RetryJitter is the jitter to apply to the delay interval. Default: 20%.
RetryJitter = 0.2
)

var log = logging.Logger("lmdbbs")
Expand Down Expand Up @@ -58,8 +75,10 @@ type Blockstore struct {
// opts are the options for this blockstore.
opts *Options

pagesize int64 // the memory page size reported by the OS.
closed int32
retryDelay time.Duration
retryJitterBound time.Duration
pagesize int64 // the memory page size reported by the OS.
closed int32
}

var (
Expand Down Expand Up @@ -95,8 +114,15 @@ type Options struct {
// --- DB sizing fields. --- //

// MaxReaders is the maximum amount of concurrent reader slots that exist
// in the lock table.
// in the lock table. By default 254.
MaxReaders int

// RetryDelay is a fixed delay to wait before a transaction that errored
// with MDB_READERS_FULL will be reattempted. Contention due to incorrect
// sizing of MaxReaders will thus lead to a system slowdown via
// backpressure, instead of a straight out error.
// Jittered by RetryJitter (default: +/-20%).
RetryDelay time.Duration
}

func Open(opts *Options) (*Blockstore, error) {
Expand Down Expand Up @@ -154,11 +180,16 @@ func Open(opts *Options) (*Blockstore, error) {
if err = env.SetMapSize(opts.InitialMmapSize); err != nil {
return nil, fmt.Errorf("failed to set LMDB map size: %w", err)
}
// Use the default max readers (126) unless a value is passed in the options.
if opts.MaxReaders != 0 {
if err = env.SetMaxReaders(opts.MaxReaders); err != nil {
return nil, fmt.Errorf("failed to set LMDB max readers: %w", err)
}
// Use the default max readers (254) unless a value is passed in the options.
if opts.MaxReaders == 0 {
opts.MaxReaders = DefaultMaxReaders
}
if err = env.SetMaxReaders(opts.MaxReaders); err != nil {
return nil, fmt.Errorf("failed to set LMDB max readers: %w", err)
}
// Use a default retry delay if none is set.
if opts.RetryDelay == 0 {
opts.RetryDelay = DefaultRetryDelay
}

// Environment options:
Expand Down Expand Up @@ -237,10 +268,12 @@ func Open(opts *Options) (*Blockstore, error) {
}

bs := &Blockstore{
env: env,
opts: opts,
dedupGrow: new(sync.Once),
pagesize: int64(pagesize),
env: env,
opts: opts,
dedupGrow: new(sync.Once),
pagesize: int64(pagesize),
retryDelay: opts.RetryDelay,
retryJitterBound: time.Duration(float64(opts.RetryDelay) * RetryJitter),
}
err = env.Update(func(txn *lmdb.Txn) (err error) {
bs.db, err = txn.OpenRoot(lmdb.Create)
Expand Down Expand Up @@ -284,19 +317,27 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
defer b.oplock.RUnlock()

var val []byte

Retry:
err := b.env.View(func(txn *lmdb.Txn) error {
v, err := txn.Get(b.db, cid.Hash())
if err == nil {
val = v
}
return err
})

switch {
case err == nil:
return blocks.NewBlockWithCid(val, cid)
case lmdb.IsNotFound(err) || lmdb.IsErrno(err, lmdb.BadValSize):
// lmdb returns badvalsize with nil keys.
err = blockstore.ErrNotFound
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("get")
b.oplock.RLock()
goto Retry
}
return nil, err
}
Expand All @@ -305,6 +346,7 @@ func (b *Blockstore) View(cid cid.Cid, callback func([]byte) error) error {
b.oplock.RLock()
defer b.oplock.RUnlock()

Retry:
err := b.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
v, err := txn.Get(b.db, cid.Hash())
Expand All @@ -314,11 +356,15 @@ func (b *Blockstore) View(cid cid.Cid, callback func([]byte) error) error {
return err
})
switch {
case err == nil:
return nil // shortcircuit the happy path with no comparisons.
case err == nil: // shortcircuit the happy path with no comparisons.
case lmdb.IsNotFound(err) || lmdb.IsErrno(err, lmdb.BadValSize):
// lmdb returns badvalsize with nil keys.
err = blockstore.ErrNotFound
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("view")
b.oplock.RLock()
goto Retry
}
return err
}
Expand All @@ -327,6 +373,7 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
b.oplock.RLock()
defer b.oplock.RUnlock()

Retry:
size := -1
err := b.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
Expand All @@ -336,12 +383,18 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
}
return err
})

switch {
case err == nil:
return size, nil // shortcircuit the happy path with no comparisons.
case err == nil: // shortcircuit happy path.
case lmdb.IsNotFound(err) || lmdb.IsErrno(err, lmdb.BadValSize):
err = blockstore.ErrNotFound
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("get size")
b.oplock.RLock()
goto Retry
}

return size, err
}

Expand All @@ -357,7 +410,10 @@ Retry:
}
return err
})
if lmdb.IsMapFull(err) {

switch {
case err == nil: // shortcircuit happy path.
case lmdb.IsMapFull(err):
o := b.dedupGrow // take the deduplicator under the lock.
b.oplock.RUnlock() // drop the concurrent lock.
var err error
Expand All @@ -367,7 +423,13 @@ Retry:
}
b.oplock.RLock() // reclaim the concurrent lock.
goto Retry
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("put")
b.oplock.RLock()
goto Retry
}

return err
}

Expand All @@ -386,7 +448,10 @@ Retry:
}
return nil
})
if lmdb.IsMapFull(err) {

switch {
case err == nil: // shortcircuit happy path.
case lmdb.IsMapFull(err):
o := b.dedupGrow // take the deduplicator under the lock.
b.oplock.RUnlock() // drop the concurrent lock.
var err error
Expand All @@ -396,7 +461,13 @@ Retry:
}
b.oplock.RLock() // reclaim the concurrent lock.
goto Retry
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("put many")
b.oplock.RLock()
goto Retry
}

return err
}

Expand All @@ -408,10 +479,10 @@ Retry:
err := b.env.Update(func(txn *lmdb.Txn) error {
return txn.Del(b.db, cid.Hash(), nil)
})
if lmdb.IsNotFound(err) {
switch {
case err == nil || lmdb.IsNotFound(err): // shortcircuit happy path.
return nil
}
if lmdb.IsMapFull(err) {
case lmdb.IsMapFull(err):
o := b.dedupGrow // take the deduplicator under the lock.
b.oplock.RUnlock() // drop the concurrent lock.
var err error
Expand All @@ -421,6 +492,11 @@ Retry:
}
b.oplock.RLock() // reclaim the concurrent lock.
goto Retry
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("delete")
b.oplock.RLock()
goto Retry
}
return err
}
Expand Down Expand Up @@ -462,6 +538,7 @@ func (c *cursor) run() {
default:
}

Retry:
var notifyClosed chan struct{}
err := c.b.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
Expand Down Expand Up @@ -499,12 +576,20 @@ func (c *cursor) run() {
case c.outCh <- it:
case notifyClosed = <-c.interruptCh:
return errInterrupted
case <-c.ctx.Done():
return nil
}
c.last = it
}
return nil
})

if lmdb.IsErrno(err, lmdb.ReadersFull) {
log.Warnf("cursor encountered MDB_READERS_FULL; waiting %s", c.b.retryDelay)
time.Sleep(c.b.retryDelay)
goto Retry
}

if err == errInterrupted {
close(notifyClosed)
return
Expand Down Expand Up @@ -599,6 +684,18 @@ func (b *Blockstore) grow() error {
return nil
}

func (b *Blockstore) sleep(opname string) {
r := rand.Int63n(int64(b.retryJitterBound))
// we don't need this to be perfect, we need it to be performant,
// so we add when even, remove when odd.
if r%2 == 1 {
r = -r
}
d := b.retryDelay + time.Duration(r)
log.Warnf(opname+" encountered MDB_READERS_FULL; waiting %s", d)
time.Sleep(d)
}

func roundup(value, multiple int64) int64 {
return int64(math.Ceil(float64(value)/float64(multiple))) * multiple
}
27 changes: 27 additions & 0 deletions blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"os"
"sync"
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logger "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multihash"
bstest "github.com/raulk/go-bs-tests"
Expand Down Expand Up @@ -222,6 +224,31 @@ func TestGrowUnderConcurrency(t *testing.T) {
wg.Wait()
}

func TestRetryWhenReadersFull(t *testing.T) {
opts := Options{
MaxReaders: 1, // single reader to induce contention.
}

bs, _ := newBlockstore(opts)(t)
defer bs.(io.Closer).Close()

putEntries(t, bs, 1*1024, 1*1024)

// this context cancels in 2 seconds.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ch, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
<-ch // consume one element, then leave it hanging.

// this get will block until the cursor has finished.
start := time.Now()
_, err = bs.Get(randomCID())
require.Equal(t, blockstore.ErrNotFound, err)
require.GreaterOrEqual(t, time.Since(start).Nanoseconds(), 1*time.Second.Nanoseconds())
}

func putEntries(t *testing.T, bs bstest.Blockstore, count int, size int) {
for i := 0; i < count; i++ {
b := make([]byte, size)
Expand Down

0 comments on commit 169df0e

Please sign in to comment.