Skip to content

Commit

Permalink
parallel Txs Prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Sep 10, 2024
1 parent 382282f commit 5fcf598
Showing 1 changed file with 144 additions and 37 deletions.
181 changes: 144 additions & 37 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -201,7 +202,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {
p.inConfirmStage2 = false

statedb.PrepareForParallel()
p.allTxReqs = make([]*ParallelTxRequest, 0, txNum)
p.allTxReqs = make([]*ParallelTxRequest, txNum)

for _, slot := range p.slotState {
slot.pendingTxReqList = make([]*ParallelTxRequest, 0)
Expand Down Expand Up @@ -872,48 +873,110 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.commonTxs = make([]*types.Transaction, 0, txNum)
p.receipts = make([]*types.Receipt, 0, txNum)

for i, tx := range allTxs {
// can be moved it into slot for efficiency, but signer is not concurrent safe
// Parallel Execution 1.0&2.0 is for full sync mode, Nonce PreCheck is not necessary
// And since we will do out-of-order execution, the Nonce PreCheck could fail.
// We will disable it and leave it to Parallel 3.0 which is for validator mode
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}
parallelNum := p.parallelNum

if txNum > parallelNum*2 && txNum >= 4 {
var wg sync.WaitGroup
errChan := make(chan error)

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
begin := 0
// first try to find latestExcludeTx, as for opBNB, they are the first consecutive txs.
for idx := 0; idx < len(allTxs); idx++ {
if txDAG != nil && txDAG.TxDep(idx).CheckFlag(types.ExcludedTxFlag) {
if err := p.transferTxs(allTxs, idx, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil {
return nil, nil, 0, err
}
latestExcludedTx = idx
} else {
begin = idx
break
}
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx

// Create a cancelable context
ctx, cancel := context.WithCancel(context.Background())

// Create a pool of workers
transactionsPerWorker := (len(allTxs) - begin) / parallelNum

// Create a pool of workers
for i := 0; i < parallelNum; i++ {
wg.Add(1)
go func(start, end int, signer types.Signer, blk *types.Block, sdb *state.StateDB, cfg vm.Config, usedGas *uint64) {
defer wg.Done()
for j := start; j < end; j++ {
select {
case <-ctx.Done():
return // Exit the goroutine if the context is canceled
default:
if err := p.transferTxs(allTxs, j, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil {
errChan <- err
cancel() // Cancel the context to stop other goroutines
return
}
}
}
}(begin+i*transactionsPerWorker, begin+(i+1)*transactionsPerWorker, signer, block, statedb, cfg, usedGas)
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
baseStateDB: statedb,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
usedGas: usedGas,
curTxChan: make(chan int, 1),
runnable: 1, // 0: not runnable, 1: runnable
useDAG: txDAG != nil,
// Distribute any remaining transactions
for i := begin + parallelNum*transactionsPerWorker; i < len(allTxs); i++ {
if err := p.transferTxs(allTxs, i, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil {
errChan <- err
cancel() // Cancel the context to stop other goroutines
}
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))

// Wait for all workers to finish and handle errors
go func() {
wg.Wait()
close(errChan)
}()

for err := range errChan {
return nil, nil, 0, err
}
p.allTxReqs = append(p.allTxReqs, txReq)
if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
latestExcludedTx = i
//
} else {
for i, tx := range allTxs {
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
baseStateDB: statedb,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
usedGas: usedGas,
curTxChan: make(chan int, 1),
runnable: 1, // 0: not runnable, 1: runnable
useDAG: txDAG != nil,
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))
}
p.allTxReqs[i] = txReq
if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
latestExcludedTx = i
}
}
}
allTxCount := len(p.allTxReqs)
Expand Down Expand Up @@ -1064,6 +1127,50 @@ func (p *ParallelStateProcessor) handlePendingResultLoop() {
}
}

func (p *ParallelStateProcessor) transferTxs(txs types.Transactions, i int, signer types.Signer, block *types.Block, statedb *state.StateDB, cfg vm.Config, usedGas *uint64, latestExcludedTx int) error {
if p.allTxReqs[i] != nil {
return nil
}
tx := txs[i]
txDAG := cfg.TxDAG
msg, err := TransactionToMessage(tx, signer, block.Header().BaseFee)
if err != nil {
return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
baseStateDB: statedb,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
usedGas: usedGas,
curTxChan: make(chan int, 1),
runnable: 1, // 0: not runnable, 1: runnable
useDAG: txDAG != nil,
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))
}
p.allTxReqs[i] = txReq
return nil
}

func applyTransactionStageExecution(msg *Message, gp *GasPool, statedb *state.ParallelStateDB, evm *vm.EVM, delayGasFee bool) (*vm.EVM, *ExecutionResult, error) {
// Create a new context to be used in the EVM environment.
txContext := NewEVMTxContext(msg)
Expand Down

0 comments on commit 5fcf598

Please sign in to comment.