Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose pebble open options, add more compact metrics #306

Merged
merged 19 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r

// NewPebbleDBDatabase creates a persistent key-value database without a freezer
// moving immutable chain segments into cold storage.
func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral)
func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool, extraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral, extraOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -399,6 +399,8 @@ type OpenOptions struct {
// Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of
// a crash is not important. This option should typically be used in tests.
Ephemeral bool

PebbleExtraOptions *pebble.ExtraOptions
}

// openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble.
Expand All @@ -420,15 +422,15 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) {
}
if o.Type == dbPebble || existingDb == dbPebble {
log.Info("Using pebble as the backing database")
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions)
}
if o.Type == dbLeveldb || existingDb == dbLeveldb {
log.Info("Using leveldb as the backing database")
return NewLevelDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly)
}
// No pre-existing database, no user-requested one either. Default to Pebble.
log.Info("Defaulting to pebble as the backing database")
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions)
}

// Open opens both a disk-based key-value database such as leveldb or pebble, but also
Expand Down
35 changes: 35 additions & 0 deletions ethdb/pebble/extraoptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pebble

import "time"

type ExtraOptions struct {
BytesPerSync int
L0CompactionFileThreshold int
L0CompactionThreshold int
L0StopWritesThreshold int
LBaseMaxBytes int64
MemTableStopWritesThreshold int
MaxConcurrentCompactions func() int
DisableAutomaticCompactions bool
WALBytesPerSync int
WALDir string
WALMinSyncInterval func() time.Duration
TargetByteDeletionRate int
Experimental ExtraOptionsExperimental
Levels []ExtraLevelOptions
}

type ExtraOptionsExperimental struct {
L0CompactionConcurrency int
CompactionDebtConcurrency uint64
ReadCompactionRate int64
ReadSamplingMultiplier int64
MaxWriterConcurrency int
ForceWriterParallelism bool
}

type ExtraLevelOptions struct {
BlockSize int
IndexBlockSize int
TargetFileSize int64
}
151 changes: 136 additions & 15 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ type Database struct {
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated

compDebtGauge metrics.Gauge
compInProgressGauge metrics.Gauge

commitCountMeter metrics.Meter
commitTotalDurationMeter metrics.Meter
commitSemaphoreWaitMeter metrics.Meter
commitMemTableWriteStallMeter metrics.Meter
commitL0ReadAmpWriteStallMeter metrics.Meter
commitWALRotationMeter metrics.Meter
commitWaitMeter metrics.Meter

commitCount atomic.Int64
commitTotalDuration atomic.Int64
commitSemaphoreWait atomic.Int64
commitMemTableWriteStall atomic.Int64
commitL0ReadAmpWriteStall atomic.Int64
commitWALRotation atomic.Int64
commitWait atomic.Int64

levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels

quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
Expand Down Expand Up @@ -137,7 +156,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {

// New returns a wrapped pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (*Database, error) {
if extraOptions == nil {
extraOptions = &ExtraOptions{}
}
if extraOptions.MemTableStopWritesThreshold <= 0 {
extraOptions.MemTableStopWritesThreshold = 2
}
if extraOptions.MaxConcurrentCompactions == nil {
extraOptions.MaxConcurrentCompactions = func() int { return runtime.NumCPU() }
}
var levels []pebble.LevelOptions
if len(extraOptions.Levels) == 0 {
levels = []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
}
} else {
for _, level := range extraOptions.Levels {
levels = append(levels, pebble.LevelOptions{
BlockSize: level.BlockSize,
IndexBlockSize: level.IndexBlockSize,
TargetFileSize: level.TargetFileSize,
FilterPolicy: bloom.FilterPolicy(10),
})
}
}

// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
Expand All @@ -162,7 +212,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e

// Two memory tables is configured which is identical to leveldb,
// including a frozen memory table and another live one.
memTableLimit := 2
memTableLimit := extraOptions.MemTableStopWritesThreshold
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit

// The memory table size is currently capped at maxMemTableSize-1 due to a
Expand Down Expand Up @@ -200,19 +250,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e

// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },
MaxConcurrentCompactions: extraOptions.MaxConcurrentCompactions,

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
},
// Per-level extraOptions. Options for at least one level must be specified. The
// extraOptions for the last level are used for all subsequent levels.
Levels: levels,
ReadOnly: readonly,
EventListener: &pebble.EventListener{
CompactionBegin: db.onCompactionBegin,
Expand All @@ -221,11 +263,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
WriteStallEnd: db.onWriteStallEnd,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble

BytesPerSync: extraOptions.BytesPerSync,
L0CompactionFileThreshold: extraOptions.L0CompactionFileThreshold,
L0CompactionThreshold: extraOptions.L0CompactionThreshold,
L0StopWritesThreshold: extraOptions.L0StopWritesThreshold,
LBaseMaxBytes: extraOptions.LBaseMaxBytes,
DisableAutomaticCompactions: extraOptions.DisableAutomaticCompactions,
WALBytesPerSync: extraOptions.WALBytesPerSync,
WALDir: extraOptions.WALDir,
WALMinSyncInterval: extraOptions.WALMinSyncInterval,
TargetByteDeletionRate: extraOptions.TargetByteDeletionRate,
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1

if opt.Experimental.ReadSamplingMultiplier != 0 {
opt.Experimental.ReadSamplingMultiplier = extraOptions.Experimental.ReadSamplingMultiplier
}
opt.Experimental.L0CompactionConcurrency = extraOptions.Experimental.L0CompactionConcurrency
opt.Experimental.CompactionDebtConcurrency = extraOptions.Experimental.CompactionDebtConcurrency
opt.Experimental.ReadCompactionRate = extraOptions.Experimental.ReadCompactionRate
opt.Experimental.MaxWriterConcurrency = extraOptions.Experimental.MaxWriterConcurrency
opt.Experimental.ForceWriterParallelism = extraOptions.Experimental.ForceWriterParallelism

// Open the db and recover any potential corruptions
innerDB, err := pebble.Open(file, opt)
if err != nil {
Expand All @@ -247,6 +309,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
db.seekCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/seek", nil)
db.manualMemAllocGauge = metrics.GetOrRegisterGauge(namespace+"memory/manualalloc", nil)

db.compDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/debt", nil)
db.compInProgressGauge = metrics.GetOrRegisterGauge(namespace+"compact/inprogress", nil)

db.commitCountMeter = metrics.GetOrRegisterMeter(namespace+"commit/counter", nil)
db.commitTotalDurationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/total", nil)
db.commitSemaphoreWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/semaphorewait", nil)
db.commitMemTableWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/memtablewritestall", nil)
db.commitL0ReadAmpWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/l0readampwritestall", nil)
db.commitWALRotationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/walrotation", nil)
db.commitWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/commitwait", nil)

// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval, namespace)
return db, nil
Expand Down Expand Up @@ -459,6 +532,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
compReads [2]int64

nWrites [2]int64

commitCounts [2]int64
commitTotalDurations [2]int64
commitSemaphoreWaits [2]int64
commitMemTableWriteStalls [2]int64
commitL0ReadAmpWriteStalls [2]int64
commitWALRotations [2]int64
commitWaits [2]int64
)

// Iterate ad infinitum and collect the stats
Expand All @@ -474,6 +555,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
writeDelayTime = d.writeDelayTime.Load()
nonLevel0CompCount = int64(d.nonLevel0Comp.Load())
level0CompCount = int64(d.level0Comp.Load())

commitCount = d.commitCount.Load()
commitTotalDuration = d.commitTotalDuration.Load()
commitSemaphoreWait = d.commitSemaphoreWait.Load()
commitMemTableWriteStall = d.commitMemTableWriteStall.Load()
commitL0ReadAmpWriteStall = d.commitL0ReadAmpWriteStall.Load()
commitWALRotation = d.commitWALRotation.Load()
commitWait = d.commitWait.Load()
)
writeDelayTimes[i%2] = writeDelayTime
writeDelayCounts[i%2] = writeDelayCount
Expand Down Expand Up @@ -524,6 +613,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(stats.Compact.ReadCount)

commitCounts[i%2] = commitCount
commitTotalDurations[i%2] = commitTotalDuration
commitSemaphoreWaits[i%2] = commitSemaphoreWait
commitMemTableWriteStalls[i%2] = commitMemTableWriteStall
commitL0ReadAmpWriteStalls[i%2] = commitL0ReadAmpWriteStall
commitWALRotations[i%2] = commitWALRotation
commitWaits[i%2] = commitWait

d.commitCountMeter.Mark(commitCounts[i%2] - commitCounts[(i-1)%2])
d.commitTotalDurationMeter.Mark(commitTotalDurations[i%2] - commitTotalDurations[(i-1)%2])
d.commitSemaphoreWaitMeter.Mark(commitSemaphoreWaits[i%2] - commitSemaphoreWaits[(i-1)%2])
d.commitMemTableWriteStallMeter.Mark(commitMemTableWriteStalls[i%2] - commitMemTableWriteStalls[(i-1)%2])
d.commitL0ReadAmpWriteStallMeter.Mark(commitL0ReadAmpWriteStalls[i%2] - commitL0ReadAmpWriteStalls[(i-1)%2])
d.commitWALRotationMeter.Mark(commitWALRotations[i%2] - commitWALRotations[(i-1)%2])
d.commitWaitMeter.Mark(commitWaits[i%2] - commitWaits[(i-1)%2])

d.compDebtGauge.Update(int64(stats.Compact.EstimatedDebt))
d.compInProgressGauge.Update(stats.Compact.NumInProgress)

for i, level := range stats.Levels {
// Append metrics for additional layers
if i >= len(d.levelsGauge) {
Expand Down Expand Up @@ -578,7 +686,20 @@ func (b *batch) Write() error {
if b.db.closed {
return pebble.ErrClosed
}
return b.b.Commit(b.db.writeOptions)
err := b.b.Commit(b.db.writeOptions)
if err != nil {
return err
}
stats := b.b.CommitStats()
b.db.commitCount.Add(1)
b.db.commitTotalDuration.Add(int64(stats.TotalDuration))
b.db.commitSemaphoreWait.Add(int64(stats.SemaphoreWaitDuration))
b.db.commitMemTableWriteStall.Add(int64(stats.MemTableWriteStallDuration))
b.db.commitL0ReadAmpWriteStall.Add(int64(stats.L0ReadAmpWriteStallDuration))
b.db.commitWALRotation.Add(int64(stats.WALRotationDuration))
b.db.commitWait.Add(int64(stats.CommitWaitDuration))
// TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0)
return nil
}

// Reset resets the batch for reuse.
Expand Down
2 changes: 1 addition & 1 deletion ethdb/pebble/pebble_non64bit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
)

func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (ethdb.Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (ethdb.Database, error) {
return nil, errors.New("pebble is not supported on this platform")
}
37 changes: 24 additions & 13 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -746,6 +747,10 @@ func (n *Node) EventMux() *event.TypeMux {
// previous can be found) from within the node's instance directory. If the node is
// ephemeral, a memory database is returned.
func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) {
return n.OpenDatabaseWithExtraOptions(name, cache, handles, namespace, readonly, nil)
}

func (n *Node) OpenDatabaseWithExtraOptions(name string, cache, handles int, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -758,12 +763,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
db = rawdb.NewMemoryDatabase()
} else {
db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
PebbleExtraOptions: pebbleExtraOptions,
})
}

Expand All @@ -779,6 +785,10 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
// database to immutable append-only files. If the node is an ephemeral one, a
// memory database is returned.
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) {
return n.OpenDatabaseWithFreezerWithExtraOptions(name, cache, handles, ancient, namespace, readonly, nil)
}

func (n *Node) OpenDatabaseWithFreezerWithExtraOptions(name string, cache, handles int, ancient string, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -790,13 +800,14 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient
db = rawdb.NewMemoryDatabase()
} else {
db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
AncientsDirectory: n.ResolveAncient(name, ancient),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
AncientsDirectory: n.ResolveAncient(name, ancient),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
PebbleExtraOptions: pebbleExtraOptions,
})
}
if err == nil {
Expand Down
Loading