diff --git a/core/blockchain.go b/core/blockchain.go index 31efef4fba..1a8c7415a0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1956,8 +1956,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) log.Debug("Disable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum) } else { bc.UseParallelProcessor() - statedb.CreateParallelDBManager(2 * txsCount) - log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum) + if bc.processor == bc.parallelProcessor { + statedb.CreateParallelDBManager(2 * txsCount) + log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum) + } } } // If we have a followup block, run that against the current state to pre-cache @@ -2924,7 +2926,8 @@ func (bc *BlockChain) UseParallelProcessor() { bc.parallelExecution = true bc.processor = bc.parallelProcessor } else { - bc.CreateParallelProcessor(bc.vmConfig.ParallelTxNum) + log.Error("bc.ParallelProcessor is nil! fallback to serial processor!") + bc.UseSerialProcessor() } } diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 499f6231a6..e09ce86b38 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -131,6 +131,7 @@ func (p *ParallelStateProcessor) init() { log.Info("Parallel execution mode is enabled", "Parallel Num", p.parallelNum, "CPUNum", runtime.NumCPU()) p.txResultChan = make(chan *ParallelTxResult, 20000) + p.slotDBsToRelease = make([]*state.ParallelStateDB, 200, 20000) p.stopSlotChan = make(chan struct{}, 1) p.stopConfirmChan = make(chan struct{}, 1) p.stopConfirmStage2Chan = make(chan struct{}, 1) @@ -197,14 +198,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { statedb.PrepareForParallel() p.allTxReqs = make([]*ParallelTxRequest, 0, txNum) - p.slotDBsToRelease = make([]*state.ParallelStateDB, 0, txNum) - stateDBsToRelease := p.slotDBsToRelease - go func() { - for _, slotDB := range stateDBsToRelease { - slotDB.PutSyncPool() - } - }() for _, slot := range p.slotState { slot.pendingTxReqList = make([]*ParallelTxRequest, 0) slot.activatedType = parallelPrimarySlot @@ -420,6 +414,8 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo } valid := p.toConfirmTxIndexResult(targetResult, isStage2) + // append result as it is going to be processed or merged. + p.slotDBsToRelease = append(p.slotDBsToRelease, targetResult.slotDB) if !valid { staticSlotIndex := targetResult.txReq.staticSlotIndex conflictBase := targetResult.slotDB.BaseTxIndex() @@ -949,6 +945,16 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat // clean up when the block is processed p.doCleanUp() + + stateDBsToRelease := p.slotDBsToRelease + go func() { + for _, slotDB := range stateDBsToRelease { + if slotDB != nil { + slotDB.PutSyncPool() + } + } + }() + if p.error != nil { return nil, nil, 0, p.error }