Skip to content

Commit

Permalink
prepare patch for PR
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Aug 6, 2024
1 parent 0608f11 commit 98fff57
Show file tree
Hide file tree
Showing 33 changed files with 127 additions and 543 deletions.
6 changes: 3 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1958,11 +1958,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {

if ctx.IsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.Bool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
// The best parallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.IsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
// Use value set by "--parallel.num", and "--parallel.num 0" is not allowed and be set to 1
parallelNum = ctx.Int(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
Expand All @@ -1972,7 +1972,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
parallelNum = 8
}
cfg.ParallelTxNum = parallelNum
}
Expand Down
10 changes: 2 additions & 8 deletions consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,16 +513,10 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
}
// Finalize block
ethash.Finalize(chain, header, state, txs, uncles, nil)
/*
js, _ := header.MarshalJSON()
fmt.Printf("== Dav -- ethash FinalizeAndAssemble, before Root update, Root %s, header json: %s\n", header.Root, js)
*/

// Assign the final state root to header.
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
/*
js, _ = header.MarshalJSON()
fmt.Printf(" == Dav -- ethash FinalizeAndAssemble, after Root update, Root %s, header json: %s\n", header.Root, js)
*/

// Header seems complete, assemble into a block and return
return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), nil
}
Expand Down
1 change: 1 addition & 0 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if ancestorErr != nil {
return ancestorErr
}

return nil
}

Expand Down
48 changes: 7 additions & 41 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ var (
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")

ParallelTxMode = false // parallel transaction execution
)

const (
Expand Down Expand Up @@ -297,12 +295,13 @@ type BlockChain struct {
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config

parallelExecution bool
enableTxDAG bool
txDAGWriteCh chan TxDAGOutputItem
Expand Down Expand Up @@ -1617,7 +1616,6 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types
// writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead.
// This function expects the chain mutex to be held.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {

if err := bc.writeBlockWithState(block, receipts, state); err != nil {
return NonStatTy, err
}
Expand Down Expand Up @@ -1659,7 +1657,6 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}

return status, nil
}

Expand Down Expand Up @@ -1934,9 +1931,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

// TODO(galaio): load TxDAG from block, use txDAG in some accelerate scenarios, like state pre-fetcher.
//if bc.enableTxDAG {}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb
Expand Down Expand Up @@ -1973,7 +1967,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
ptime := time.Since(pstart)

vstart := time.Now()

if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
Expand Down Expand Up @@ -2007,28 +2000,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if !setHead {
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, receipts, statedb)
if false {
fmt.Printf("Dav -- After writeBlockWithState: %d check balance\n", block.NumberU64())
actual := statedb.GetBalance(block.Coinbase())
fmt.Printf("Dav -- AfterwriteBlockWithState: %d balance: %d\n", block.NumberU64(), actual.Uint64())
}
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
if false {
fmt.Printf("Dav -- After writeBlockAndSetHead: %d check balance\n", block.NumberU64())
actual := statedb.GetBalance(block.Coinbase())
fmt.Printf("Dav -- writeBlockAndSetHead: %d balance: %d\n", block.NumberU64(), actual.Uint64())

s, _ := bc.State()
bk := bc.CurrentBlock()
fmt.Printf("Dav -- writeBlockAndSetHead - currentBlock: %d root: %s\n", bk.Number.Uint64(), bk.Root)
obj, _ := s.GetStateObjectFromSnapshotOrTrie(block.Coinbase())

//obj, _ := statedb.GetStateObjectFromSnapshotOrTrie(block.Coinbase())

fmt.Printf("Dav -- writeBlockAndSetHead: %d obj from snap or trie: %p\n", block.NumberU64(), obj)

}
}
followupInterrupt.Store(true)
if err != nil {
Expand Down Expand Up @@ -2775,13 +2748,6 @@ func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
}

func (bc *BlockChain) EnableParallelProcessor(parallelNum int) (*BlockChain, error) {
/*
if bc.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return bc, nil
}
*/
bc.parallelExecution = true
bc.processor = NewParallelStateProcessor(bc.Config(), bc, bc.engine, parallelNum)
return bc, nil
Expand Down
4 changes: 0 additions & 4 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1798,13 +1798,9 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}

// fmt.Printf("Dav -- test -- testRepairWithScheme -- chain after NewBlockChain processor: %v, parallel: %v, vmConfig: %v\n", chain.processor, chain.parallelExecution, chain.vmConfig)

// If sidechain blocks are needed, make a light chain and import it
var sideblocks types.Blocks
if tt.sidechainBlocks > 0 {
//fmt.Printf("Dav -- test -- testRepairWithScheme -- tt.sidechainBlocks: %d\n", tt.sidechainBlocks)
sideblocks, _ = GenerateChain(gspec.Config, gspec.ToBlock(), engine, rawdb.NewMemoryDatabase(), tt.sidechainBlocks, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{0x01})
})
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,7 +1741,7 @@ func testEIP161AccountRemoval(t *testing.T, scheme string) {
t.Fatal(err)
}
if st, _ := blockchain.State(); st.Exist(theAddr) {
t.Error("account should not exist", "triExist?", st.TriHasAccount(theAddr), "SnapExist?", st.SnapHasAccount(theAddr))
t.Error("account should not exist")
}

// account mustn't be created post eip 161
Expand Down Expand Up @@ -2172,14 +2172,14 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon
}
nonce++
})

if n, err := chain.InsertChain(blocks); err != nil {
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
}

lastPrunedIndex := len(blocks) - TriesInMemory - 1
lastPrunedBlock := blocks[lastPrunedIndex]
firstNonPrunedBlock := blocks[len(blocks)-TriesInMemory]

// Verify pruning of lastPrunedBlock
if chain.HasBlockAndState(lastPrunedBlock.Hash(), lastPrunedBlock.NumberU64()) {
t.Errorf("Block %d not pruned", lastPrunedBlock.NumberU64())
Expand All @@ -2188,6 +2188,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon
if !chain.HasBlockAndState(firstNonPrunedBlock.Hash(), firstNonPrunedBlock.NumberU64()) {
t.Errorf("Block %d pruned", firstNonPrunedBlock.NumberU64())
}

// Activate the transition in the middle of the chain
if mergePoint == 1 {
merger.ReachTTD()
Expand Down Expand Up @@ -3940,8 +3941,7 @@ func testSetCanonical(t *testing.T, scheme string) {
diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
defer diskdb.Close()

chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine,
vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("failed to create tester chain: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,6 @@ var (
// ErrSystemTxNotSupported is returned for any deposit tx with IsSystemTx=true after the Regolith fork
ErrSystemTxNotSupported = errors.New("system tx not supported")

// ErrParallelUnexpectedConflict is returned when execution finally get conflict error for more than block tx number
// ErrParallelUnexpectedConflict is returned when execution finally get conflict error that should not occur
ErrParallelUnexpectedConflict = errors.New("parallel execution unexpected conflict")
)
Loading

0 comments on commit 98fff57

Please sign in to comment.