Skip to content

Commit

Permalink
add pebble extra options
Browse files Browse the repository at this point in the history
  • Loading branch information
magicxyyz committed Apr 20, 2024
1 parent c36c60f commit f9055c9
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 30 deletions.
3 changes: 2 additions & 1 deletion arbnode/dataposter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/redis"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/signature"
Expand All @@ -44,7 +45,7 @@ func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.St

func newPebbleDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.Storage {
t.Helper()
db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true)
db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions())
if err != nil {
t.Fatalf("NewPebbleDBDatabase() unexpected error: %v", err)
}
Expand Down
123 changes: 117 additions & 6 deletions cmd/conf/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"time"

"github.com/ethereum/go-ethereum/ethdb/pebble"
flag "github.com/spf13/pflag"
)

type PersistentConfig struct {
GlobalConfig string `koanf:"global-config"`
Chain string `koanf:"chain"`
LogDir string `koanf:"log-dir"`
Handles int `koanf:"handles"`
Ancient string `koanf:"ancient"`
DBEngine string `koanf:"db-engine"`
GlobalConfig string `koanf:"global-config"`
Chain string `koanf:"chain"`
LogDir string `koanf:"log-dir"`
Handles int `koanf:"handles"`
Ancient string `koanf:"ancient"`
DBEngine string `koanf:"db-engine"`
Pebble PebbleConfig `koanf:"pebble"`
}

var PersistentConfigDefault = PersistentConfig{
Expand All @@ -28,6 +32,7 @@ var PersistentConfigDefault = PersistentConfig{
Handles: 512,
Ancient: "",
DBEngine: "leveldb",
Pebble: PebbleConfigDefault,
}

func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -37,6 +42,7 @@ func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".handles", PersistentConfigDefault.Handles, "number of file descriptor handles to use for the database")
f.String(prefix+".ancient", PersistentConfigDefault.Ancient, "directory of ancient where the chain freezer can be opened")
f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use ('leveldb' or 'pebble')")
PebbleConfigAddOptions(prefix+".pebble", f)
}

func (c *PersistentConfig) ResolveDirectoryNames() error {
Expand Down Expand Up @@ -96,3 +102,108 @@ func (c *PersistentConfig) Validate() error {
}
return nil
}

type PebbleConfig struct {
BytesPerSync int `koanf:"bytes-per-sync"`
L0CompactionFileThreshold int `koanf:"l0-compaction-file-threshold"`
L0CompactionThreshold int `koanf:"l0-compaction-threshold"`
L0StopWritesThreshold int `koanf:"l0-stop-writes-threshold"`
LBaseMaxBytes int64 `koanf:"l-base-max-bytes"`
MaxConcurrentCompactions int `koanf:"max-concurrent-compactions"`
DisableAutomaticCompactions bool `koanf:"disable-automatic-compactions"`
WALBytesPerSync int `koanf:"wal-bytes-per-sync"`
WALDir string `koanf:"wal-dir"`
WALMinSyncInterval int `koanf:"wal-min-sync-interval"`
TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"`
Experimental PebbleExperimentalConfig `koaf:"experimental"`
}

var PebbleConfigDefault = PebbleConfig{
BytesPerSync: 0, // pebble default will be used
L0CompactionFileThreshold: 0, // pebble default will be used
L0CompactionThreshold: 0, // pebble default will be used
L0StopWritesThreshold: 0, // pebble default will be used
LBaseMaxBytes: 0, // pebble default will be used
MaxConcurrentCompactions: runtime.NumCPU(),
DisableAutomaticCompactions: false,
WALBytesPerSync: 0, // pebble default will be used
WALDir: "", // default will use same dir as for sstables
WALMinSyncInterval: 0, // pebble default will be used
TargetByteDeletionRate: 0, // pebble default will be used
Experimental: PebbleExperimentalConfigDefault,
}

func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".bytes-per-sync", PebbleConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background (0 = pebble default)")
f.Int(prefix+".l0-compaction-file-threshold", PebbleConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction (0 = pebble default)")
f.Int(prefix+".l0-compaction-threshold", PebbleConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction (0 = pebble default)")
f.Int(prefix+".l0-stop-writes-threshold", PebbleConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)")
f.Int64(prefix+".l-base-max-bytes", PebbleConfigDefault.LBaseMaxBytes, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached (0 = pebble default)")
f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions (0 = pebble default)")
f.Bool(prefix+".disable-automatic-compactions", PebbleConfigDefault.DisableAutomaticCompactions, "disables automatic compactions")
f.Int(prefix+".wal-bytes-per-sync", PebbleConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the backgroud (0 = pebble default)")
f.String(prefix+".wal-dir", PebbleConfigDefault.WALDir, "directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables")
f.Int(prefix+".wal-min-sync-interval", PebbleConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.")
PebbleExperimentalConfigAddOptions(".experimental", f)
}

type PebbleExperimentalConfig struct {
L0CompactionConcurrency int `koanf:"l0-compaction-concurrency"`
CompactionDebtConcurrency uint64 `koanf:"compaction-debt-concurrency"`
ReadCompactionRate int64 `koanf:"read-compaction-rate"`
ReadSamplingMultiplier int64 `koanf:"read-sampling-multiplier"`
MaxWriterConcurrency int `koanf:"max-writer-concurrency"`
ForceWriterParallelism bool `koanf:"force-writer-parallelism"`
}

var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{
L0CompactionConcurrency: 0,
CompactionDebtConcurrency: 0,
ReadCompactionRate: 0,
ReadSamplingMultiplier: -1,
MaxWriterConcurrency: 0,
ForceWriterParallelism: false,
}

func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions. (0 = pebble default)")
f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen. (0 = pebble default)")
f.Int64(prefix+".read-compaction-rate", PebbleExperimentalConfigDefault.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate")
f.Int64(prefix+".read-sampling-multiplier", PebbleExperimentalConfigDefault.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB). (0 = pebble default)")
f.Int(prefix+".max-writer-concurrency", PebbleExperimentalConfigDefault.MaxWriterConcurrency, "maximum number of compression workers the compression queue is allowed to use. If max-writer-concurrency > 0, then the Writer will use parallelism, to compress and write blocks to disk. Otherwise, the writer will compress and write blocks to disk synchronously.")
f.Bool(prefix+".force-writer-parallelism", PebbleExperimentalConfigDefault.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.")
}

func (c *PebbleConfig) ExtraOptions() *pebble.ExtraOptions {
var maxConcurrentCompactions func() int
if c.MaxConcurrentCompactions > 0 {
maxConcurrentCompactions = func() int { return c.MaxConcurrentCompactions }
}
var walMinSyncInterval func() time.Duration
if c.WALMinSyncInterval > 0 {
walMinSyncInterval = func() time.Duration {
return time.Microsecond * time.Duration(c.WALMinSyncInterval)
}
}
return &pebble.ExtraOptions{
BytesPerSync: c.BytesPerSync,
L0CompactionFileThreshold: c.L0CompactionFileThreshold,
L0CompactionThreshold: c.L0CompactionThreshold,
L0StopWritesThreshold: c.L0StopWritesThreshold,
LBaseMaxBytes: c.LBaseMaxBytes,
MaxConcurrentCompactions: maxConcurrentCompactions,
DisableAutomaticCompactions: c.DisableAutomaticCompactions,
WALBytesPerSync: c.WALBytesPerSync,
WALDir: c.WALDir,
WALMinSyncInterval: walMinSyncInterval,
TargetByteDeletionRate: c.TargetByteDeletionRate,
Experimental: pebble.ExtraOptionsExperimental{
L0CompactionConcurrency: c.Experimental.L0CompactionConcurrency,
CompactionDebtConcurrency: c.Experimental.CompactionDebtConcurrency,
ReadCompactionRate: c.Experimental.ReadCompactionRate,
ReadSamplingMultiplier: c.Experimental.ReadSamplingMultiplier,
MaxWriterConcurrency: c.Experimental.MaxWriterConcurrency,
ForceWriterParallelism: c.Experimental.ForceWriterParallelism,
},
}
}
12 changes: 6 additions & 6 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,19 @@ func validateBlockChain(blockChain *core.BlockChain, chainConfig *params.ChainCo
return nil
}

func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) {
func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) {
if !config.Init.Force {
if readOnlyDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", 0, 0, "", "l2chaindata/", true); err == nil {
if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions()); err == nil {
if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil {
readOnlyDb.Close()
if !arbmath.BigEquals(chainConfig.ChainID, chainId) {
return nil, nil, fmt.Errorf("database has chain ID %v but config has chain ID %v (are you sure this database is for the right chain?)", chainConfig.ChainID, chainId)
}
chainDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false)
chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions())
if err != nil {
return chainDb, nil, err
}
err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, persistentConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
if err != nil {
return chainDb, nil, fmt.Errorf("error pruning: %w", err)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo

var initDataReader statetransfer.InitDataReader = nil

chainDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false)
chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions())
if err != nil {
return chainDb, nil, err
}
Expand Down Expand Up @@ -367,7 +367,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
return chainDb, l2BlockChain, err
}

err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, persistentConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
if err != nil {
return chainDb, nil, fmt.Errorf("error pruning: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func mainImpl() int {
nodeConfig.Auth.Apply(&stackConf)
nodeConfig.IPC.Apply(&stackConf)
nodeConfig.GraphQL.Apply(&stackConf)

if nodeConfig.WS.ExposeAll {
stackConf.WSModules = append(stackConf.WSModules, "personal")
}
Expand Down Expand Up @@ -476,7 +477,7 @@ func mainImpl() int {
}
}

chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), l1Client, rollupAddrs)
chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), &nodeConfig.Persistent, l1Client, rollupAddrs)
if l2BlockChain != nil {
deferFuncs = append(deferFuncs, func() { l2BlockChain.Stop() })
}
Expand All @@ -487,7 +488,7 @@ func mainImpl() int {
return 1
}

arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false)
arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, nodeConfig.Persistent.Pebble.ExtraOptions())
deferFuncs = append(deferFuncs, func() { closeDb(arbDb, "arbDb") })
if err != nil {
log.Error("failed to open database", "err", err)
Expand Down
8 changes: 4 additions & 4 deletions cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ func (r *importantRoots) addHeader(header *types.Header, overwrite bool) error {
var hashListRegex = regexp.MustCompile("^(0x)?[0-9a-fA-F]{64}(,(0x)?[0-9a-fA-F]{64})*$")

// Finds important roots to retain while proving
func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) ([]common.Hash, error) {
func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) ([]common.Hash, error) {
chainConfig := gethexec.TryReadStoredChainConfig(chainDb)
if chainConfig == nil {
return nil, errors.New("database doesn't have a chain config (was this node initialized?)")
}
arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", true)
arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", true, persistentConfig.Pebble.ExtraOptions())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -232,11 +232,11 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return roots.roots, nil
}

func PruneChainDb(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) error {
func PruneChainDb(ctx context.Context, chainDb ethdb.Database, stack *node.Node, initConfig *conf.InitConfig, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, validatorRequired bool) error {
if initConfig.Prune == "" {
return pruner.RecoverPruning(stack.InstanceDir(), chainDb)
}
root, err := findImportantRoots(ctx, chainDb, stack, initConfig, cacheConfig, l1Client, rollupAddrs, validatorRequired)
root, err := findImportantRoots(ctx, chainDb, stack, initConfig, cacheConfig, persistentConfig, l1Client, rollupAddrs, validatorRequired)
if err != nil {
return fmt.Errorf("failed to find root to retain for pruning: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func CreateExecutionNode(
var classicOutbox *ClassicOutboxRetriever

if l2BlockChain.Config().ArbitrumChainParams.GenesisBlockNum > 0 {
classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "classicmsg/", true)
classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "classicmsg/", true) // TODO can we skip using ExtraOptions here?
if err != nil {
log.Warn("Classic Msg Database not found", "err", err)
classicOutbox = nil
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
Loading

0 comments on commit f9055c9

Please sign in to comment.