Skip to content

Commit

Permalink
feat: disable parallel when txs count is low (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da authored Nov 26, 2024
1 parent 9ca5adc commit 70107bf
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 3 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ var (
utils.ParallelTxFlag,
utils.ParallelTxUnorderedMergeFlag,
utils.ParallelTxNumFlag,
utils.ParallelThresholdFlag,
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
utils.ParallelTxDAGSenderPrivFlag,
Expand Down
10 changes: 10 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,12 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelThresholdFlag = &cli.IntFlag{
Name: "parallel.threshold",
Usage: "Threshold of transaction count to trigger parallel execution, only valid in parallel mode (runtime calculated, no fixed default value)",
Category: flags.VMCategory,
}

ParallelTxDAGFlag = &cli.BoolFlag{
Name: "parallel.txdag",
Usage: "Enable the experimental parallel TxDAG generation, only valid in full sync mode (default = false)",
Expand Down Expand Up @@ -2034,6 +2040,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxNum = ctx.Int(ParallelTxNumFlag.Name)
}

if ctx.IsSet(ParallelThresholdFlag.Name) {
cfg.ParallelThreshold = ctx.Int(ParallelThresholdFlag.Name)
}

if ctx.IsSet(ParallelTxDAGFlag.Name) {
cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name)
}
Expand Down
9 changes: 8 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
log.Info("Parallel V2 enabled", "parallelNum", ParallelNum())
} else {
bc.processor = NewStateProcessor(chainConfig, bc, engine)
bc.serialProcessor = bc.processor
}
// Start future block processor.
bc.wg.Add(1)
Expand Down Expand Up @@ -1936,9 +1937,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

statedb.SetExpectedStateRoot(block.Root())

// Decide the enabling of parallelExec
invalidParallelConfig := bc.vmConfig.TxDAG == nil && bc.vmConfig.EnableParallelUnorderedMerge
lowTxsNum := bc.vmConfig.ParallelThreshold >= block.Transactions().Len()
useSerialProcessor := invalidParallelConfig || lowTxsNum || !bc.vmConfig.EnableParallelExec

// Process block using the parent state as reference point
pstart = time.Now()
if bc.vmConfig.TxDAG == nil && bc.vmConfig.EnableParallelUnorderedMerge {

if useSerialProcessor {
receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig)
} else {
receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig)
Expand Down
6 changes: 5 additions & 1 deletion core/pevm_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ func newPEVMProcessor(config *params.ChainConfig, bc *BlockChain, engine consens
unorderedMerge: bc.vmConfig.EnableParallelUnorderedMerge,
}
initParallelRunner(bc.vmConfig.ParallelTxNum)
if bc.vmConfig.ParallelThreshold == 0 {
bc.vmConfig.ParallelThreshold = ParallelNum()
}
log.Info("Parallel execution mode is enabled", "Parallel Num", ParallelNum(),
"CPUNum", runtime.GOMAXPROCS(0), "unorderedMerge", processor.unorderedMerge)
"CPUNum", runtime.GOMAXPROCS(0), "unorderedMerge", processor.unorderedMerge,
"parallel threshold", bc.vmConfig.ParallelThreshold)
return processor
}

Expand Down
1 change: 1 addition & 0 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Config struct {
ExtraEips []int // Additional EIPS that are to be enabled
EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
ParallelThreshold int // Threshold of transactions number to trigger parallel process
OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism
EnableOpcodeOptimizations bool // Enable opcode optimization
TxDAG types.TxDAG
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnableParallelExec: config.ParallelTxMode,
EnableParallelUnorderedMerge: config.ParallelTxUnorderedMerge,
ParallelTxNum: config.ParallelTxNum,
ParallelThreshold: config.ParallelThreshold,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
}
cacheConfig = &core.CacheConfig{
Expand Down
2 changes: 1 addition & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ type Config struct {

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
ParallelThreshold int // threshold to trigger parallel execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode

}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down

0 comments on commit 70107bf

Please sign in to comment.