From 98fff57c9abd3de219b0e4eea7a706bfa831d418 Mon Sep 17 00:00:00 2001 From: Sunny Date: Tue, 6 Aug 2024 14:13:39 +0800 Subject: [PATCH] prepare patch for PR --- cmd/utils/flags.go | 6 +- consensus/ethash/consensus.go | 10 +- core/block_validator.go | 1 + core/blockchain.go | 48 +------ core/blockchain_repair_test.go | 4 - core/blockchain_test.go | 8 +- core/error.go | 2 +- core/parallel_state_processor.go | 114 ++++++--------- core/state/journal.go | 1 - core/state/parallel_statedb.go | 231 ++++-------------------------- core/state/snapshot/conversion.go | 21 ++- core/state/snapshot/difflayer.go | 1 - core/state/snapshot/snapshot.go | 5 +- core/state/state_object.go | 85 +---------- core/state/statedb.go | 55 +------ core/state/statedb_test.go | 14 +- core/state_processor.go | 4 +- core/state_processor_test.go | 5 +- core/state_transition.go | 1 - core/types/block.go | 2 - core/types/receipt.go | 9 -- core/vm/evm.go | 8 +- core/vm/gas_table.go | 1 + core/vm/instructions.go | 3 +- core/vm/interface.go | 5 - core/vm/interpreter.go | 4 +- core/vm/operations_acl.go | 3 +- eth/downloader/testchain_test.go | 1 + eth/handler_eth.go | 1 - metrics/exp/exp.go | 3 - miner/worker.go | 10 -- trie/triedb/pathdb/database.go | 3 - trie/triedb/pathdb/disklayer.go | 1 - 33 files changed, 127 insertions(+), 543 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index facd26e628..d716aa6093 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1958,11 +1958,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(ParallelTxFlag.Name) { cfg.ParallelTxMode = ctx.Bool(ParallelTxFlag.Name) - // The best prallel num will be tuned later, we do a simple parallel num set here + // The best parallel num will be tuned later, we do a simple parallel num set here numCpu := runtime.NumCPU() var parallelNum int if ctx.IsSet(ParallelTxNumFlag.Name) { - // first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed + // Use value set by "--parallel.num", and "--parallel.num 0" is not allowed and be set to 1 parallelNum = ctx.Int(ParallelTxNumFlag.Name) if parallelNum < 1 { parallelNum = 1 @@ -1972,7 +1972,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { } else if numCpu < 10 { parallelNum = numCpu - 1 } else { - parallelNum = 8 // we found concurrency 8 is slightly better than 15 + parallelNum = 8 } cfg.ParallelTxNum = parallelNum } diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 039b712288..130dfdf213 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -513,16 +513,10 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea } // Finalize block ethash.Finalize(chain, header, state, txs, uncles, nil) - /* - js, _ := header.MarshalJSON() - fmt.Printf("== Dav -- ethash FinalizeAndAssemble, before Root update, Root %s, header json: %s\n", header.Root, js) - */ + // Assign the final state root to header. header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) - /* - js, _ = header.MarshalJSON() - fmt.Printf(" == Dav -- ethash FinalizeAndAssemble, after Root update, Root %s, header json: %s\n", header.Root, js) - */ + // Header seems complete, assemble into a block and return return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), nil } diff --git a/core/block_validator.go b/core/block_validator.go index 538cea51b0..79839d7176 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -156,6 +156,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if ancestorErr != nil { return ancestorErr } + return nil } diff --git a/core/blockchain.go b/core/blockchain.go index 4b6cddcaab..4105150a5f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -113,8 +113,6 @@ var ( errChainStopped = errors.New("blockchain is stopped") errInvalidOldChain = errors.New("invalid old chain") errInvalidNewChain = errors.New("invalid new chain") - - ParallelTxMode = false // parallel transaction execution ) const ( @@ -297,12 +295,13 @@ type BlockChain struct { stopping atomic.Bool // false if chain is running, true when stopped procInterrupt atomic.Bool // interrupt signaler for block processing - engine consensus.Engine - validator Validator // Block and state validator interface - prefetcher Prefetcher - processor Processor // Block transaction processor interface - forker *ForkChoice - vmConfig vm.Config + engine consensus.Engine + validator Validator // Block and state validator interface + prefetcher Prefetcher + processor Processor // Block transaction processor interface + forker *ForkChoice + vmConfig vm.Config + parallelExecution bool enableTxDAG bool txDAGWriteCh chan TxDAGOutputItem @@ -1617,7 +1616,6 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - if err := bc.writeBlockWithState(block, receipts, state); err != nil { return NonStatTy, err } @@ -1659,7 +1657,6 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types } else { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } - return status, nil } @@ -1934,9 +1931,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } - // TODO(galaio): load TxDAG from block, use txDAG in some accelerate scenarios, like state pre-fetcher. - //if bc.enableTxDAG {} - // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb @@ -1973,7 +1967,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) ptime := time.Since(pstart) vstart := time.Now() - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) followupInterrupt.Store(true) @@ -2007,28 +2000,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if !setHead { // Don't set the head, only insert the block err = bc.writeBlockWithState(block, receipts, statedb) - if false { - fmt.Printf("Dav -- After writeBlockWithState: %d check balance\n", block.NumberU64()) - actual := statedb.GetBalance(block.Coinbase()) - fmt.Printf("Dav -- AfterwriteBlockWithState: %d balance: %d\n", block.NumberU64(), actual.Uint64()) - } } else { status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false) - if false { - fmt.Printf("Dav -- After writeBlockAndSetHead: %d check balance\n", block.NumberU64()) - actual := statedb.GetBalance(block.Coinbase()) - fmt.Printf("Dav -- writeBlockAndSetHead: %d balance: %d\n", block.NumberU64(), actual.Uint64()) - - s, _ := bc.State() - bk := bc.CurrentBlock() - fmt.Printf("Dav -- writeBlockAndSetHead - currentBlock: %d root: %s\n", bk.Number.Uint64(), bk.Root) - obj, _ := s.GetStateObjectFromSnapshotOrTrie(block.Coinbase()) - - //obj, _ := statedb.GetStateObjectFromSnapshotOrTrie(block.Coinbase()) - - fmt.Printf("Dav -- writeBlockAndSetHead: %d obj from snap or trie: %p\n", block.NumberU64(), obj) - - } } followupInterrupt.Store(true) if err != nil { @@ -2775,13 +2748,6 @@ func (bc *BlockChain) GetTrieFlushInterval() time.Duration { } func (bc *BlockChain) EnableParallelProcessor(parallelNum int) (*BlockChain, error) { - /* - if bc.snaps == nil { - // disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie - log.Info("parallel processor is not enabled since snapshot is not enabled") - return bc, nil - } - */ bc.parallelExecution = true bc.processor = NewParallelStateProcessor(bc.Config(), bc, bc.engine, parallelNum) return bc, nil diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 7fe44bf14c..7f3d2b9983 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1798,13 +1798,9 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s if err != nil { t.Fatalf("Failed to create chain: %v", err) } - - // fmt.Printf("Dav -- test -- testRepairWithScheme -- chain after NewBlockChain processor: %v, parallel: %v, vmConfig: %v\n", chain.processor, chain.parallelExecution, chain.vmConfig) - // If sidechain blocks are needed, make a light chain and import it var sideblocks types.Blocks if tt.sidechainBlocks > 0 { - //fmt.Printf("Dav -- test -- testRepairWithScheme -- tt.sidechainBlocks: %d\n", tt.sidechainBlocks) sideblocks, _ = GenerateChain(gspec.Config, gspec.ToBlock(), engine, rawdb.NewMemoryDatabase(), tt.sidechainBlocks, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{0x01}) }) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index ead47a6060..b871ef586c 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1741,7 +1741,7 @@ func testEIP161AccountRemoval(t *testing.T, scheme string) { t.Fatal(err) } if st, _ := blockchain.State(); st.Exist(theAddr) { - t.Error("account should not exist", "triExist?", st.TriHasAccount(theAddr), "SnapExist?", st.SnapHasAccount(theAddr)) + t.Error("account should not exist") } // account mustn't be created post eip 161 @@ -2172,7 +2172,6 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon } nonce++ }) - if n, err := chain.InsertChain(blocks); err != nil { t.Fatalf("block %d: failed to insert into chain: %v", n, err) } @@ -2180,6 +2179,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon lastPrunedIndex := len(blocks) - TriesInMemory - 1 lastPrunedBlock := blocks[lastPrunedIndex] firstNonPrunedBlock := blocks[len(blocks)-TriesInMemory] + // Verify pruning of lastPrunedBlock if chain.HasBlockAndState(lastPrunedBlock.Hash(), lastPrunedBlock.NumberU64()) { t.Errorf("Block %d not pruned", lastPrunedBlock.NumberU64()) @@ -2188,6 +2188,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon if !chain.HasBlockAndState(firstNonPrunedBlock.Hash(), firstNonPrunedBlock.NumberU64()) { t.Errorf("Block %d pruned", firstNonPrunedBlock.NumberU64()) } + // Activate the transition in the middle of the chain if mergePoint == 1 { merger.ReachTTD() @@ -3940,8 +3941,7 @@ func testSetCanonical(t *testing.T, scheme string) { diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) defer diskdb.Close() - chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, - vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil) + chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } diff --git a/core/error.go b/core/error.go index 0942283422..a23fe8904f 100644 --- a/core/error.go +++ b/core/error.go @@ -108,6 +108,6 @@ var ( // ErrSystemTxNotSupported is returned for any deposit tx with IsSystemTx=true after the Regolith fork ErrSystemTxNotSupported = errors.New("system tx not supported") - // ErrParallelUnexpectedConflict is returned when execution finally get conflict error for more than block tx number + // ErrParallelUnexpectedConflict is returned when execution finally get conflict error that should not occur ErrParallelUnexpectedConflict = errors.New("parallel execution unexpected conflict") ) diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 018e66f19a..f5c348e18c 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -31,23 +31,23 @@ type ParallelStateProcessor struct { slotState []*SlotState // idle, or pending messages allTxReqs []*ParallelTxRequest txResultChan chan *ParallelTxResult // to notify dispatcher that a tx is done - mergedTxIndex atomic.Int32 // the latest finalized tx index, fixme: use Atomic + mergedTxIndex atomic.Int32 // the latest finalized tx index pendingConfirmResults map[int][]*ParallelTxResult // tx could be executed several times, with several result to check - unconfirmedResults *sync.Map // this is for stage2 confirm, since pendingConfirmResults can not be accessed in stage2 loop - unconfirmedDBs *sync.Map + unconfirmedResults *sync.Map // for stage2 confirm, since pendingConfirmResults can not be accessed in stage2 loop + unconfirmedDBs *sync.Map // intermediate store of slotDB that is not verified slotDBsToRelease []*state.ParallelStateDB stopSlotChan chan struct{} stopConfirmChan chan struct{} debugConflictRedoNum int - // start for confirm stage2 + confirmStage2Chan chan int stopConfirmStage2Chan chan struct{} txReqExecuteRecord map[int]int txReqExecuteCount int inConfirmStage2 bool - targetStage2Count int // when executed txNUM reach it, enter stage2 RT confirm + targetStage2Count int nextStage2TxIndex int - delayGasFee bool // it is provided by TxDAG + delayGasFee bool } func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine, parallelNum int) *ParallelStateProcessor { @@ -60,7 +60,7 @@ func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engin } type MergedTxInfo struct { - slotDB *state.StateDB // used for SlotDb reuse only, otherwise, it can be discarded + slotDB *state.StateDB StateObjectSuicided map[common.Address]struct{} StateChangeSet map[common.Address]state.StateKeys BalanceChangeSet map[common.Address]struct{} @@ -79,11 +79,11 @@ type SlotState struct { } type ParallelTxResult struct { - executedIndex int32 // the TxReq can be executed several time, increase index for each execution - slotIndex int // slot index + executedIndex int32 // record the current execute number of the tx + slotIndex int txReq *ParallelTxRequest receipt *types.Receipt - slotDB *state.ParallelStateDB // if updated, it is not equal to txReq.slotDB + slotDB *state.ParallelStateDB gpSlot *GasPool evm *vm.EVM result *ExecutionResult @@ -94,7 +94,7 @@ type ParallelTxResult struct { type ParallelTxRequest struct { txIndex int baseStateDB *state.StateDB - staticSlotIndex int // static dispatched id + staticSlotIndex int tx *types.Transaction gasLimit uint64 msg *Message @@ -102,14 +102,12 @@ type ParallelTxRequest struct { vmConfig vm.Config usedGas *uint64 curTxChan chan int - systemAddrRedo bool - runnable int32 // 0: not runnable, executing, 1: runnable, on hold, can be scheduled + runnable int32 // 0: not runnable 1: runnable - can be scheduled executedNum atomic.Int32 - retryNum int32 - conflictIndex atomic.Int32 + conflictIndex atomic.Int32 // the conflicted mainDB tx number, the txs will not be executed before this number } -// to create and start the execution slot goroutines +// init to initialize and start the execution goroutines func (p *ParallelStateProcessor) init() { log.Info("Parallel execution mode is enabled", "Parallel Num", p.parallelNum, "CPUNum", runtime.NumCPU()) @@ -135,18 +133,18 @@ func (p *ParallelStateProcessor) init() { // It is back up of the primary slot to make sure transaction can be redone ASAP, // since the primary slot could be busy at executing another transaction go func(slotIndex int) { - p.runSlotLoop(slotIndex, parallelShadowSlot) // this loop will be permanent live + p.runSlotLoop(slotIndex, parallelShadowSlot) }(i) } p.confirmStage2Chan = make(chan int, 10) go func() { - p.runConfirmStage2Loop() // this loop will be permanent live + p.runConfirmStage2Loop() }() } -// clear slot state for each block. +// resetState clear slot state for each block. func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { if txNum == 0 { return @@ -189,7 +187,7 @@ func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) { for _, txReq := range txReqs { var slotIndex = -1 if i, ok := fromSlotMap[txReq.msg.From]; ok { - // first: same From are all in same slot + // first: same From goes to same slot slotIndex = i } else if txReq.msg.To != nil { // To Address, with txIndex sorted, could be in different slot. @@ -232,16 +230,16 @@ func (p *ParallelStateProcessor) mostHungrySlot() int { return slotIndex } -// do conflict detect +// hasConflict conducts conflict check func (p *ParallelStateProcessor) hasConflict(txResult *ParallelTxResult, isStage2 bool) bool { slotDB := txResult.slotDB if txResult.err != nil { return true } else if slotDB.NeedsRedo() { - // if this is any reason that indicates this transaction needs to redo, skip the conflict check + // if there is any reason that indicates this transaction needs to redo, skip the conflict check return true } else { - // to check if what the slot db read is correct. + // check whether the slot db reads during execution are correct. if !slotDB.IsParallelReadsValid(isStage2) { return true } @@ -254,21 +252,22 @@ func (p *ParallelStateProcessor) switchSlot(slotIndex int) { if atomic.CompareAndSwapInt32(&slot.activatedType, parallelPrimarySlot, parallelShadowSlot) { // switch from normal to shadow slot if len(slot.shadowWakeUpChan) == 0 { - slot.shadowWakeUpChan <- struct{}{} // only notify when target once + slot.shadowWakeUpChan <- struct{}{} } } else if atomic.CompareAndSwapInt32(&slot.activatedType, parallelShadowSlot, parallelPrimarySlot) { // switch from shadow to normal slot if len(slot.primaryWakeUpChan) == 0 { - slot.primaryWakeUpChan <- struct{}{} // only notify when target once + slot.primaryWakeUpChan <- struct{}{} } } } +// executeInSlot do tx execution with thread local slot. func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult { mIndex := p.mergedTxIndex.Load() conflictIndex := txReq.conflictIndex.Load() if mIndex < conflictIndex { - // The conflicted TX has not been finished executing, skip execution. + // The conflicted TX has not been finished executing, skip. // the transaction failed at check(nonce or balance), actually it has not been executed yet. atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1) return nil @@ -298,7 +297,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR executedIndex: execNum, slotIndex: slotIndex, txReq: txReq, - receipt: nil, // receipt is generated in finalize stage + receipt: nil, slotDB: slotDB, err: err, gpSlot: gpSlot, @@ -311,11 +310,11 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR p.unconfirmedDBs.Store(txReq.txIndex, slotDB) } else { // the transaction failed at check(nonce or balance), actually it has not been executed yet. - // the error here can be both expected and unexpected + // the error here can be either expected or unexpected. // expected - the execution is correct and the error is normal result // unexpected - the execution is incorrectly accessed the state because of parallelization. // In both case, rerun with next version of stateDB, it is a waste and buggy to rerun with same - // version of stateDB. + // version of stateDB that has been marked conflict. // Therefore, treat it as conflict and rerun, leave the result to conflict check. // Load conflict as it maybe updated by conflict checker or other execution slots. // use old mIndex so that we can try the new one that is updated by other thread of merging @@ -337,7 +336,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR return &txResult } -// to confirm a serial TxResults with same txIndex +// toConfirmTxIndex confirm a serial TxResults with same txIndex func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bool) *ParallelTxResult { if isStage2 { if targetTxIndex <= int(p.mergedTxIndex.Load())+1 { @@ -381,12 +380,11 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo valid := p.toConfirmTxIndexResult(targetResult, isStage2) if !valid { - staticSlotIndex := targetResult.txReq.staticSlotIndex // it is better to run the TxReq in its static dispatch slot + staticSlotIndex := targetResult.txReq.staticSlotIndex conflictBase := targetResult.slotDB.BaseTxIndex() conflictIndex := targetResult.txReq.conflictIndex.Load() if conflictIndex < int32(conflictBase) { if targetResult.txReq.conflictIndex.CompareAndSwap(conflictIndex, int32(conflictBase)) { - // updated successfully log.Debug("Update conflict index", "conflictIndex", conflictIndex, "conflictBase", conflictBase) } } @@ -403,14 +401,6 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo // TODO-dav: p.mergedTxIndex+2 may be more reasonable? - this is buggy for expected exit if targetResult.txReq.txIndex == int(p.mergedTxIndex.Load())+1 && targetResult.slotDB.BaseTxIndex() == int(p.mergedTxIndex.Load()) { - /* - // txReq is the next to merge - if atomic.LoadInt32(&targetResult.txReq.retryNum) <= int32(blockTxCount)+3000 { - atomic.AddInt32(&targetResult.txReq.retryNum, 1) - // conflict retry - } else { - */ - // retry many times and still conflict, either the tx is expected to be wrong, or something wrong. if targetResult.err != nil { if false { // TODO: delete the printf fmt.Printf("!!!!!!!!!!! Parallel execution exited with error!!!!!, txIndex:%d, err: %v\n", targetResult.txReq.txIndex, targetResult.err) @@ -497,12 +487,9 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { if atomic.LoadInt32(&curSlot.activatedType) != slotType { interrupted = true - // fmt.Printf("Dav -- runInLoop, - activatedType - TxREQ: %d\n", txReq.txIndex) - break } if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { - // not swapped: txReq.runnable == 0 continue } res := p.executeInSlot(slotIndex, txReq) @@ -510,7 +497,6 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { continue } p.txResultChan <- res - // fmt.Printf("Dav -- runInLoop, - loopbody tail - TxREQ: %d\n", txReq.txIndex) } // switched to the other slot. if interrupted { @@ -520,37 +506,28 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { // txReq in this Slot have all been executed, try steal one from other slot. // as long as the TxReq is runnable, we steal it, mark it as stolen for _, stealTxReq := range p.allTxReqs { - // fmt.Printf("Dav -- stealLoop, handle TxREQ: %d\n", stealTxReq.txIndex) if stealTxReq.txIndex <= int(p.mergedTxIndex.Load()) { - // fmt.Printf("Dav -- stealLoop, - txReq.txIndex <= p.mergedTxIndex - TxREQ: %d\n", stealTxReq.txIndex) continue } if atomic.LoadInt32(&curSlot.activatedType) != slotType { interrupted = true - // fmt.Printf("Dav -- stealLoop, - activatedType - TxREQ: %d\n", stealTxReq.txIndex) break } if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) { - // not swapped: txReq.runnable == 0 - // fmt.Printf("Dav -- stealLoop, - not runnable - TxREQ: %d\n", stealTxReq.txIndex) - continue } - // fmt.Printf("Dav -- stealLoop, - executeInSlot - TxREQ: %d\n", stealTxReq.txIndex) res := p.executeInSlot(slotIndex, stealTxReq) if res == nil { continue } p.txResultChan <- res - // fmt.Printf("Dav -- stealLoop, - loopbody tail - TxREQ: %d\n", stealTxReq.txIndex) } } } func (p *ParallelStateProcessor) runConfirmStage2Loop() { for { - // var mergedTxIndex int select { case <-p.stopConfirmStage2Chan: for len(p.confirmStage2Chan) > 0 { @@ -566,11 +543,6 @@ func (p *ParallelStateProcessor) runConfirmStage2Loop() { // stage 2,if all tx have been executed at least once, and its result has been received. // in Stage 2, we will run check when merge is advanced. // more aggressive tx result confirm, even for these Txs not in turn - // now we will be more aggressive: - // do conflict check , as long as tx result is generated, - // if lucky, it is the Tx's turn, we will do conflict check with WBNB makeup - // otherwise, do conflict check without WBNB makeup, but we will ignore WBNB's balance conflict. - // throw these likely conflicted tx back to re-execute startTxIndex := int(p.mergedTxIndex.Load()) + 2 // stage 2's will start from the next target merge index endTxIndex := startTxIndex + stage2CheckNumber txSize := len(p.allTxReqs) @@ -578,7 +550,6 @@ func (p *ParallelStateProcessor) runConfirmStage2Loop() { endTxIndex = txSize - 1 } log.Debug("runConfirmStage2Loop", "startTxIndex", startTxIndex, "endTxIndex", endTxIndex) - // conflictNumMark := p.debugConflictRedoNum for txIndex := startTxIndex; txIndex < endTxIndex; txIndex++ { p.toConfirmTxIndex(txIndex, true) } @@ -587,7 +558,6 @@ func (p *ParallelStateProcessor) runConfirmStage2Loop() { p.switchSlot(i) } } - } func (p *ParallelStateProcessor) handleTxResults() *ParallelTxResult { @@ -683,7 +653,7 @@ func (p *ParallelStateProcessor) doCleanUp() { } // 2.discard delayed txResults if any for { - if len(p.txResultChan) > 0 { // drop prefetch addr? + if len(p.txResultChan) > 0 { <-p.txResultChan continue } @@ -694,7 +664,7 @@ func (p *ParallelStateProcessor) doCleanUp() { <-p.stopSlotChan } -// Implement BEP-130: Parallel Transaction Execution. +// Process implements BEP-130 Parallel Transaction Execution func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { var ( receipts types.Receipts @@ -780,9 +750,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat vmConfig: cfg, usedGas: usedGas, curTxChan: make(chan int, 1), - systemAddrRedo: false, // set to true, when systemAddr access is detected. - runnable: 1, // 0: not runnable, 1: runnable - retryNum: 0, + runnable: 1, // 0: not runnable, 1: runnable } txReq.executedNum.Store(0) txReq.conflictIndex.Store(-2) @@ -822,7 +790,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat unconfirmedResult := <-p.txResultChan unconfirmedTxIndex := unconfirmedResult.txReq.txIndex if unconfirmedTxIndex <= int(p.mergedTxIndex.Load()) { - // log.Warn("drop merged txReq", "unconfirmedTxIndex", unconfirmedTxIndex, "p.mergedTxIndex", p.mergedTxIndex) + log.Warn("drop merged txReq", "unconfirmedTxIndex", unconfirmedTxIndex, "p.mergedTxIndex", p.mergedTxIndex) continue } p.pendingConfirmResults[unconfirmedTxIndex] = append(p.pendingConfirmResults[unconfirmedTxIndex], unconfirmedResult) @@ -832,8 +800,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat if _, ok := p.txReqExecuteRecord[unconfirmedTxIndex]; !ok { p.txReqExecuteRecord[unconfirmedTxIndex] = 0 p.txReqExecuteCount++ - statedb.AddrPrefetch(unconfirmedResult.slotDB) // todo: prefetch when it is not merged - // enter stage2, RT confirm + statedb.AddrPrefetch(unconfirmedResult.slotDB) if !p.inConfirmStage2 && p.txReqExecuteCount == p.targetStage2Count { p.inConfirmStage2 = true } @@ -858,7 +825,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat } } - // to do clean up when the block is processed + // clean up when the block is processed p.doCleanUp() // len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486 @@ -909,13 +876,12 @@ func applyTransactionStageExecution(msg *Message, gp *GasPool, statedb *state.Pa return evm, result, err } -func applyTransactionStageFinalization(evm *vm.EVM, result *ExecutionResult, msg Message, config *params.ChainConfig, - statedb *state.ParallelStateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, nonce *uint64) (*types.Receipt, error) { +func applyTransactionStageFinalization(evm *vm.EVM, result *ExecutionResult, msg Message, + config *params.ChainConfig, statedb *state.ParallelStateDB, header *types.Header, + tx *types.Transaction, usedGas *uint64, nonce *uint64) (*types.Receipt, error) { *usedGas += result.UsedGas - - // Create a new receipt for the transaction, storing the intermediate root and gas used - // by the tx. + // Create a new receipt for the transaction, storing the intermediate root and gas used by the tx. receipt := &types.Receipt{Type: tx.Type(), PostState: nil, CumulativeGasUsed: *usedGas} if result.Failed() { receipt.Status = types.ReceiptStatusFailed diff --git a/core/state/journal.go b/core/state/journal.go index 036868948f..bb40a62d68 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -50,7 +50,6 @@ func newJournal() *journal { // append inserts a new modification entry to the end of the change journal. func (j *journal) append(entry journalEntry) { j.entries = append(j.entries, entry) - if addr := entry.dirtied(); addr != nil { j.dirties[*addr]++ } diff --git a/core/state/parallel_statedb.go b/core/state/parallel_statedb.go index ebe07b8297..4f95b44556 100644 --- a/core/state/parallel_statedb.go +++ b/core/state/parallel_statedb.go @@ -6,12 +6,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "math/big" "runtime" "sort" "sync" - "time" ) const defaultNumOfSlots = 100 @@ -58,8 +56,7 @@ func hasKvConflict(slotDB *ParallelStateDB, addr common.Address, key common.Hash "key", key, "valSlot", val, "valMain", valMain, "SlotIndex", slotDB.parallel.SlotIndex, "txIndex", slotDB.txIndex, "baseTxIndex", slotDB.parallel.baseTxIndex) - - return true // return false, Range will be terminated. + return true } return false } @@ -82,14 +79,13 @@ func StartKvCheckLoop() { // NewSlotDB creates a new State DB based on the provided StateDB. // With parallel, each execution slot would have its own StateDB. // This method must be called after the baseDB call PrepareParallel() -func NewSlotDB(db *StateDB, txIndex int, baseTxIndex int, unconfirmedDBs *sync.Map /*map[int]*ParallelStateDB*/) *ParallelStateDB { +func NewSlotDB(db *StateDB, txIndex int, baseTxIndex int, unconfirmedDBs *sync.Map) *ParallelStateDB { slotDB := db.CopyForSlot() slotDB.txIndex = txIndex slotDB.originalRoot = db.originalRoot slotDB.parallel.baseStateDB = db slotDB.parallel.baseTxIndex = baseTxIndex slotDB.parallel.unconfirmedDBs = unconfirmedDBs - return slotDB } @@ -111,7 +107,8 @@ func (s *ParallelStateDB) RevertSlotDB(from common.Address) { s.parallel.nonceChangesInSlot[from] = struct{}{} } -func (s *ParallelStateDB) getBaseStateDB() *StateDB { +// getStateDBBasePtr get the pointer of parallelStateDB. +func (s *ParallelStateDB) getStateDBBasePtr() *StateDB { return &s.StateDB } @@ -119,8 +116,8 @@ func (s *ParallelStateDB) SetSlotIndex(index int) { s.parallel.SlotIndex = index } -// for parallel execution mode, try to get dirty StateObject in slot first. -// it is mainly used by journal revert right now. +// getStateObject get the state object from parallel stateDB for journal revert. +// for parallel execution, try to get dirty StateObject in slot first. func (s *ParallelStateDB) getStateObject(addr common.Address) *stateObject { var object *stateObject if obj, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; ok { @@ -129,21 +126,14 @@ func (s *ParallelStateDB) getStateObject(addr common.Address) *stateObject { } object = obj } else { - // can not call s.StateDB.getStateObject(), since `newObject` need ParallelStateDB as the interface object = s.getStateObjectNoSlot(addr) } return object } func (s *ParallelStateDB) storeStateObj(addr common.Address, stateObject *stateObject) { - // When a state object is stored into s.parallel.stateObjects, - // it belongs to base StateDB, it is confirmed and valid. - // todo Dav: why need change this? -- delete me ! - // stateObject.db = s.parallel.baseStateDB - // stateObject.dbItf = s.parallel.baseStateDB - - // the object could be created in SlotDB, if it got the object from DB and - // update it to the shared `s.parallel.stateObjects`` + // The object could be created in SlotDB, if it got the object from DB and + // update it to the `s.parallel.stateObjects` stateObject.db.parallelStateAccessLock.Lock() if _, ok := s.parallel.stateObjects.Load(addr); !ok { s.parallel.stateObjects.Store(addr, stateObject) @@ -176,7 +166,6 @@ func (s *ParallelStateDB) getStateObjectNoSlot(addr common.Address) *stateObject // c.as `snapDestructs` it is the same func (s *ParallelStateDB) createObject(addr common.Address) (newobj *stateObject) { prev := s.parallel.dirtiedStateObjectsInSlot[addr] - // TODO-dav: check // There can be tx0 create an obj at addr0, tx1 destruct it, and tx2 recreate it use create2. // so if tx0 is finalized, and tx1 is unconfirmed, we have to check the states of unconfirmed, otherwise there // will be wrong behavior that we recreate an object that is already there. see. test "TestDeleteThenCreate" @@ -258,7 +247,6 @@ func (s *ParallelStateDB) GetOrNewStateObject(addr common.Address) *stateObject } // not found, or found in NoSlot or found in unconfirmed. exist := true - // TODO-dav: the check of nil and delete already done by NoSlot and unconfirmedDB, may optimize it for dirty only. if object == nil || object.deleted { object = s.createObject(addr) exist = false @@ -326,7 +314,7 @@ func (s *ParallelStateDB) Empty(addr common.Address) bool { } // 2.2 Try to get from unconfirmed DB if exist if exist, ok := s.getAddrStateFromUnconfirmedDB(addr, true); ok { - s.parallel.addrStateReadsInSlot[addr] = exist // update and cache + s.parallel.addrStateReadsInSlot[addr] = exist // update read cache return !exist } // 2.3 Try to get from NoSlot. @@ -385,11 +373,6 @@ func (s *ParallelStateDB) GetBalance(addr common.Address) *big.Int { return balance } -// GetBalanceOpCode different from GetBalance(), it is opcode triggered -func (s *ParallelStateDB) GetBalanceOpCode(addr common.Address) *big.Int { - return s.GetBalance(addr) -} - func (s *ParallelStateDB) GetNonce(addr common.Address) uint64 { var dirtyObj *stateObject // 0. Test whether it is deleted in dirty. @@ -500,7 +483,7 @@ func (s *ParallelStateDB) GetCodeSize(addr common.Address) int { var code []byte // 2.2 Try to get from unconfirmed DB if exist if cd, ok := s.getCodeFromUnconfirmedDB(addr); ok { - cs = len(cd) // len(nil) is 0 too + cs = len(cd) code = cd } else { // 3. Try to get from main StateObject @@ -562,7 +545,7 @@ func (s *ParallelStateDB) GetCodeHash(addr common.Address) common.Hash { } s.parallel.codeHashReadsInSlot[addr] = codeHash - // fill slots in dirty if exist. + // fill slots in dirty if existed. // A case for this: // TX0: createAccount at addr 0x123, set code and codehash // TX1: AddBalance - now an obj in dirty with empty codehash, and codeChangesInSlot is false (not changed) @@ -600,10 +583,8 @@ func (s *ParallelStateDB) GetState(addr common.Address, hash common.Hash) common // 1.Try to get from dirty if exist, ok := s.parallel.addrStateChangesInSlot[addr]; ok { if !exist { - // it could be suicided within this SlotDB? - // it should be able to get state from suicided address within a Tx: - // e.g. within a transaction: call addr:suicide -> get state: should be ok - // return common.Hash{} + // it should be able to get state from selfDestruct address within a Tx: + // e.g. within a transaction: call addr:selfDestruct -> get state: should be ok log.Info("ParallelStateDB GetState suicided", "addr", addr, "hash", hash) } else { // It is possible that an object get created but not dirtied since there is no state set, such as recreate. @@ -642,8 +623,8 @@ func (s *ParallelStateDB) GetState(addr common.Address, hash common.Hash) common return val } } - // 2.2 Object in dirty because of other changes, such as getBalance etc. - // load from dirty directly and the stateObject.GetState() will care of the KvReadInSlot update. + // 2.2 Object in dirty due to other changes, such as getBalance etc. + // load from dirty directly and the stateObject.GetState() will take care of the KvReadInSlot update. // So there is no chance for create different objects with same address. (one in dirty and one from non-slot, and inconsistency) if dirtyObj != nil { return dirtyObj.GetState(hash) @@ -672,21 +653,21 @@ func (s *ParallelStateDB) GetState(addr common.Address, hash common.Hash) common // So it should not access/update dirty, and not check delete of dirty objects. func (s *ParallelStateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash { - // 2.Try to get from unconfirmed DB or main DB + // 1.Try to get from unconfirmed DB or main DB // KVs in unconfirmed DB can be seen as pending storage // KVs in main DB are merged from SlotDB and has done finalise() on merge, can be seen as pending storage too. - // 2.1 Already read before + // 1.1 Already read before if storage, ok := s.parallel.kvReadsInSlot[addr]; ok { if val, ok := storage.GetValue(hash); ok { return val } } value := common.Hash{} - // 2.2 Try to get from unconfirmed DB if exist + // 1.2 Try to get from unconfirmed DB if exist if val, ok := s.getKVFromUnconfirmedDB(addr, hash); ok { value = val } else { - // 3. Try to get from main DB + // 2. Try to get from main DB val = common.Hash{} object := s.getStateObjectNoSlot(addr) if object != nil { @@ -698,7 +679,6 @@ func (s *ParallelStateDB) GetCommittedState(addr common.Address, hash common.Has s.parallel.kvReadsInSlot[addr] = newStorage(false) } s.parallel.kvReadsInSlot[addr].StoreValue(hash, value) // update cache - return value } @@ -729,7 +709,7 @@ func (s *ParallelStateDB) AddBalance(addr common.Address, amount *big.Int) { object := s.GetOrNewStateObject(addr) if object != nil { if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { - newStateObject := object.lightCopy(s) // light copy from main DB + newStateObject := object.lightCopy(s) // do balance fixup from the confirmed DB, it could be more reliable than main DB balance := s.GetBalance(addr) // it will record the balance read operation newStateObject.setBalance(balance) @@ -757,7 +737,7 @@ func (s *ParallelStateDB) SubBalance(addr common.Address, amount *big.Int) { object := s.GetOrNewStateObject(addr) if object != nil { if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { - newStateObject := object.lightCopy(s) // light copy from main DB + newStateObject := object.lightCopy(s) // do balance fixup from the confirmed DB, it could be more reliable than main DB balance := s.GetBalance(addr) newStateObject.setBalance(balance) @@ -845,11 +825,6 @@ func (s *ParallelStateDB) SetState(addr common.Address, key, value common.Hash) object := s.GetOrNewStateObject(addr) // attention: if StateObject's lightCopy, its storage is only a part of the full storage, if object != nil { if s.parallel.baseTxIndex+1 == s.txIndex { - // we check if state is unchanged - // only when current transaction is the next transaction to be committed - // fixme: there is a bug, block: 14,962,284, - // stateObject is in dirty (light copy), but the key is in mainStateDB - // stateObject dirty -> committed, will skip mainStateDB dirty if s.GetState(addr, key) == value { log.Debug("Skip set same state", "baseTxIndex", s.parallel.baseTxIndex, "txIndex", s.txIndex, "addr", addr, @@ -859,7 +834,7 @@ func (s *ParallelStateDB) SetState(addr common.Address, key, value common.Hash) } if s.parallel.kvChangesInSlot[addr] == nil { - s.parallel.kvChangesInSlot[addr] = make(StateKeys) // make(Storage, defaultNumOfSlots) + s.parallel.kvChangesInSlot[addr] = make(StateKeys) } if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { @@ -914,21 +889,18 @@ func (s *ParallelStateDB) SelfDestruct(addr common.Address) { s.journal.append(selfDestructChange{ account: &addr, - prev: object.selfDestructed, // todo: must be false? + prev: object.selfDestructed, prevbalance: new(big.Int).Set(s.GetBalance(addr)), }) if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { - // do copy-on-write for suicide "write" newStateObject := object.lightCopy(s) newStateObject.markSelfdestructed() newStateObject.setBalance(new(big.Int)) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - s.parallel.addrStateChangesInSlot[addr] = false // false: the address does not exist any more, - // s.parallel.nonceChangesInSlot[addr] = struct{}{} + s.parallel.addrStateChangesInSlot[addr] = false s.parallel.balanceChangesInSlot[addr] = struct{}{} s.parallel.codeChangesInSlot[addr] = struct{}{} - // s.parallel.kvChangesInSlot[addr] = make(StateKeys) // all key changes are discarded return } @@ -963,9 +935,9 @@ func (s *ParallelStateDB) CreateAccount(addr common.Address) { // no matter it is got from dirty, unconfirmed or main DB // if addr not exist, preBalance will be common.Big0, it is same as new(big.Int) which // is the value newObject(), - preBalance := s.GetBalance(addr) // parallel balance read will be recorded inside GetBalance + preBalance := s.GetBalance(addr) newObj := s.createObject(addr) - newObj.setBalance(new(big.Int).Set(preBalance)) // new big.Int for newObj + newObj.setBalance(new(big.Int).Set(preBalance)) } // RevertToSnapshot reverts all state changes made since the given revision. @@ -986,7 +958,7 @@ func (s *ParallelStateDB) RevertToSnapshot(revid int) { // AddRefund adds gas to the refund counter // journal.append will use ParallelState for revert -func (s *ParallelStateDB) AddRefund(gas uint64) { // todo: not needed, can be deleted +func (s *ParallelStateDB) AddRefund(gas uint64) { s.journal.append(refundChange{prev: s.refund}) s.refund += gas } @@ -1024,7 +996,7 @@ func (s *ParallelStateDB) getBalanceFromUnconfirmedDB(addr common.Address) *big. if _, exist := db.parallel.addrStateChangesInSlot[addr]; exist { balanceHit = true } - if _, exist := db.parallel.balanceChangesInSlot[addr]; exist { // only changed balance is reliable + if _, exist := db.parallel.balanceChangesInSlot[addr]; exist { balanceHit = true } if !balanceHit { @@ -1036,7 +1008,6 @@ func (s *ParallelStateDB) getBalanceFromUnconfirmedDB(addr common.Address) *big. balance = common.Big0 } return balance - } return nil } @@ -1063,8 +1034,6 @@ func (s *ParallelStateDB) getNonceFromUnconfirmedDB(addr common.Address) (uint64 // nonce hit, return the nonce obj := db.parallel.dirtiedStateObjectsInSlot[addr] if obj == nil { - // could not exist, if it is changed but reverted - // fixme: revert should remove the change record log.Debug("Get nonce from UnconfirmedDB, changed but object not exist, ", "txIndex", s.txIndex, "referred txIndex", i, "addr", addr) continue @@ -1102,8 +1071,6 @@ func (s *ParallelStateDB) getCodeFromUnconfirmedDB(addr common.Address) ([]byte, } obj := db.parallel.dirtiedStateObjectsInSlot[addr] if obj == nil { - // could not exist, if it is changed but reverted - // fixme: revert should remove the change record log.Debug("Get code from UnconfirmedDB, changed but object not exist, ", "txIndex", s.txIndex, "referred txIndex", i, "addr", addr) continue @@ -1140,8 +1107,6 @@ func (s *ParallelStateDB) getCodeHashFromUnconfirmedDB(addr common.Address) (com } obj := db.parallel.dirtiedStateObjectsInSlot[addr] if obj == nil { - // could not exist, if it is changed but reverted - // fixme: revert should remove the change record log.Debug("Get codeHash from UnconfirmedDB, changed but object not exist, ", "txIndex", s.txIndex, "referred txIndex", i, "addr", addr) continue @@ -1169,8 +1134,6 @@ func (s *ParallelStateDB) getAddrStateFromUnconfirmedDB(addr common.Address, tes db := db_.(*ParallelStateDB) if exist, ok := db.parallel.addrStateChangesInSlot[addr]; ok { if obj, ok := db.parallel.dirtiedStateObjectsInSlot[addr]; !ok { - // could not exist, if it is changed but reverted - // fixme: revert should remove the change record log.Debug("Get addr State from UnconfirmedDB, changed but object not exist, ", "txIndex", s.txIndex, "referred txIndex", i, "addr", addr) continue @@ -1250,7 +1213,6 @@ func (slotDB *ParallelStateDB) IsParallelReadsValid(isStage2 bool) bool { } } } - /* can not use mainDB.GetNonce() because we do not want to record the stateObject */ var nonceMain uint64 = 0 mainObj := mainDB.getStateObjectNoUpdate(addr) if mainObj != nil { @@ -1297,7 +1259,6 @@ func (slotDB *ParallelStateDB) IsParallelReadsValid(isStage2 bool) bool { }) } readLen := len(units) - // TODO-dav: change back to 8 or 1? if readLen < 80000 || isStage2 { for _, unit := range units { if hasKvConflict(slotDB, unit.addr, unit.key, unit.val, isStage2) { @@ -1439,7 +1400,7 @@ func (s *ParallelStateDB) NeedsRedo() bool { // FinaliseForParallel finalises the state by removing the destructed objects and clears // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. -// It also handle the mainDB dirties for the first TX. +// It also handles the mainDB dirties for the first TX. func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *StateDB) { addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) @@ -1449,12 +1410,6 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S var exist bool obj, exist = mainDB.getStateObjectFromStateObjects(addr) if !exist { - // ripeMD is 'touched' at block 1714175, in tx 0x1237f737031e40bcde4a8b7e717b2d15e3ecadfe49bb1bbc71ee9deb09c6fcf2 - // That tx goes out of gas, and although the notion of 'touched' does not exist there, the - // touch-event will still be recorded in the journal. Since ripeMD is a special snowflake, - // it will persist in the journal even though the journal is reverted. In this special circumstance, - // it may exist in `s.journal.dirties` but not in `s.stateObjects`. - // Thus, we can safely ignore it here continue } @@ -1490,7 +1445,7 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S mainDB.stateObjectsPending[addr] = struct{}{} mainDB.stateObjectsDirty[addr] = struct{}{} - // At this point, also ship the address off to the precacher. The precacher + // At this point, also ship the address off to the prefetch. The prefetcher // will start loading tries, and when the change is eventually committed, // the commit-phase will be a lot faster addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure @@ -1513,12 +1468,6 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S obj, exist = s.getStateObjectFromStateObjects(addr) } if !exist { - // ripeMD is 'touched' at block 1714175, in tx 0x1237f737031e40bcde4a8b7e717b2d15e3ecadfe49bb1bbc71ee9deb09c6fcf2 - // That tx goes out of gas, and although the notion of 'touched' does not exist there, the - // touch-event will still be recorded in the journal. Since ripeMD is a special snowflake, - // it will persist in the journal even though the journal is reverted. In this special circumstance, - // it may exist in `s.journal.dirties` but not in `s.stateObjects`. - // Thus, we can safely ignore it here continue } @@ -1572,7 +1521,7 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S s.stateObjectsPending[addr] = struct{}{} s.stateObjectsDirty[addr] = struct{}{} - // At this point, also ship the address off to the precacher. The precacher + // At this point, also ship the address off to the prefetcher. The prefetcher // will start loading tries, and when the change is eventually committed, // the commit-phase will be a lot faster addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure @@ -1584,121 +1533,3 @@ func (s *ParallelStateDB) FinaliseForParallel(deleteEmptyObjects bool, mainDB *S // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() } - -// IntermediateRootForSlotDB computes the current root hash of the state trie. -// It is called in between transactions to get the root hash that -// goes into transaction receipts. -// For parallel SlotDB, the intermediateRoot can be used to calculate the temporary root after executing single tx. -func (s *ParallelStateDB) IntermediateRootForSlotDB(deleteEmptyObjects bool, mainDB *StateDB) common.Hash { - // Finalise all the dirty storage states and write them into the tries - s.FinaliseForParallel(deleteEmptyObjects, mainDB) - - // If there was a trie prefetcher operating, it gets aborted and irrevocably - // modified after we start retrieving tries. Remove it from the statedb after - // this round of use. - // - // This is weird pre-byzantium since the first tx runs with a prefetcher and - // the remainder without, but pre-byzantium even the initial prefetcher is - // useless, so no sleep lost. - prefetcher := mainDB.prefetcher - if mainDB.prefetcher != nil { - defer func() { - mainDB.prefetcher.close() - mainDB.prefetcher = nil - }() - } - - if s.TxIndex() == 0 && len(mainDB.stateObjectsPending) > 0 { - for addr := range mainDB.stateObjectsPending { - var obj *stateObject - if obj, _ = mainDB.getStateObjectFromStateObjects(addr); !obj.deleted { - obj.updateRoot() - } - } - } - - // Although naively it makes sense to retrieve the account trie and then do - // the contract storage and account updates sequentially, that short circuits - // the account prefetcher. Instead, let's process all the storage updates - // first, giving the account prefetches just a few more milliseconds of time - // to pull useful data from disk. - for addr := range s.stateObjectsPending { - var obj *stateObject - if s.parallel.isSlotDB { - if obj = s.parallel.dirtiedStateObjectsInSlot[addr]; !obj.deleted { - obj.updateRoot() - } - } else { - if obj, _ = s.getStateObjectFromStateObjects(addr); !obj.deleted { - obj.updateRoot() - } - } - } - - // Now we're about to start to write changes to the trie. The trie is so far - // _untouched_. We can check with the prefetcher, if it can give us a trie - // which has the same root, but also has some content loaded into it. - // The parallel execution do the change incrementally, so can not check the prefetcher here - if prefetcher != nil { - if trie := prefetcher.trie(common.Hash{}, mainDB.originalRoot); trie != nil { - mainDB.trie = trie - } - } - - usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) - - if s.TxIndex() == 0 && len(mainDB.stateObjectsPending) > 0 { - usedAddrs = make([][]byte, 0, len(s.stateObjectsPending)+len(mainDB.stateObjectsPending)) - for addr := range mainDB.stateObjectsPending { - if obj, _ := mainDB.getStateObjectFromStateObjects(addr); obj.deleted { - mainDB.deleteStateObject(obj) - mainDB.AccountDeleted += 1 - } else { - mainDB.updateStateObject(obj) - mainDB.AccountUpdated += 1 - } - usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure - } - } - - for addr := range s.stateObjectsPending { - if s.parallel.isSlotDB { - if obj := s.parallel.dirtiedStateObjectsInSlot[addr]; obj.deleted { - mainDB.deleteStateObject(obj) - mainDB.AccountDeleted += 1 - } else { - mainDB.updateStateObject(obj) - mainDB.AccountUpdated += 1 - } - } else if obj, _ := s.getStateObjectFromStateObjects(addr); obj.deleted { - mainDB.deleteStateObject(obj) - mainDB.AccountDeleted += 1 - } else { - mainDB.updateStateObject(obj) - mainDB.AccountUpdated += 1 - } - usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure - } - - if prefetcher != nil { - prefetcher.used(common.Hash{}, mainDB.originalRoot, usedAddrs) - } - // parallel slotDB trie will be updated to mainDB since intermediateRoot happens after conflict check. - // so it should be save to clear pending here. - // otherwise there can be a case that the deleted object get ignored and processes as live object in verify phase. - - if s.TxIndex() == 0 && len(mainDB.stateObjectsPending) > 0 { - mainDB.stateObjectsPending = make(map[common.Address]struct{}) - } - - if /*s.isParallel == false &&*/ len(s.stateObjectsPending) > 0 { - s.stateObjectsPending = make(map[common.Address]struct{}) - } - // Track the amount of time wasted on hashing the account trie - if metrics.EnabledExpensive { - defer func(start time.Time) { mainDB.AccountHashes += time.Since(start) }(time.Now()) - } - ret := mainDB.trie.Hash() - - return ret -} diff --git a/core/state/snapshot/conversion.go b/core/state/snapshot/conversion.go index 9e07ee8523..26a0e128e0 100644 --- a/core/state/snapshot/conversion.go +++ b/core/state/snapshot/conversion.go @@ -243,7 +243,7 @@ func runReport(stats *generateStats, stop chan bool) { // generateTrieRoot generates the trie hash based on the snapshot iterator. // It can be used for generating account trie, storage trie or even the // whole state which connects the accounts and the corresponding storages. -func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accountExt common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) { +func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, account common.Hash, generatorFn trieGeneratorFn, leafCallback leafCallbackFn, stats *generateStats, report bool) (common.Hash, error) { var ( in = make(chan trieKV) // chan to pass leaves out = make(chan common.Hash, 1) // chan to collect result @@ -254,7 +254,7 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou wg.Add(1) go func() { defer wg.Done() - generatorFn(db, scheme, accountExt, in, out) + generatorFn(db, scheme, account, in, out) }() // Spin up a go-routine for progress logging if report && stats != nil { @@ -294,7 +294,7 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou ) // Start to feed leaves for it.Next() { - if accountExt == (common.Hash{}) { + if account == (common.Hash{}) { var ( err error fullData []byte @@ -324,12 +324,7 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou return } if account.Root != subroot { - - // results <- fmt.Errorf("invalid subroot(path %x), want %x, have %x", hash, account.Root, subroot) - - results <- fmt.Errorf("invalid subroot(path %x), want %x, have %x\n accountEXT: %s, account.ROOT: %v, codehash: %s\n", - hash, account.Root, subroot, accountExt.Hex(), account.Root, common.Bytes2Hex(account.CodeHash)) - + results <- fmt.Errorf("invalid subroot(path %x), want %x, have %x", hash, account.Root, subroot) return } results <- nil @@ -348,20 +343,20 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou // Accumulate the generation statistic if it's required. processed++ if time.Since(logged) > 3*time.Second && stats != nil { - if accountExt == (common.Hash{}) { + if account == (common.Hash{}) { stats.progressAccounts(it.Hash(), processed) } else { - stats.progressContract(accountExt, it.Hash(), processed) + stats.progressContract(account, it.Hash(), processed) } logged, processed = time.Now(), 0 } } // Commit the last part statistic. if processed > 0 && stats != nil { - if accountExt == (common.Hash{}) { + if account == (common.Hash{}) { stats.finishAccounts(processed) } else { - stats.finishContract(accountExt, processed) + stats.finishContract(account, processed) } } return stop(nil) diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index ba277f0d49..b6aca599c5 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -485,7 +485,6 @@ func (dl *diffLayer) flatten() snapshot { comboData[storageHash] = data } } - // Return the combo parent return &diffLayer{ parent: parent.parent, diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index fb706239d9..759298ccb5 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -361,6 +361,7 @@ func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs m // Save the new snapshot for later t.lock.Lock() defer t.lock.Unlock() + t.layers[snap.root] = snap return nil } @@ -403,6 +404,7 @@ func (t *Tree) Cap(root common.Hash, layers int) error { diff.lock.RLock() base := diffToDisk(diff.flatten().(*diffLayer)) diff.lock.RUnlock() + // Replace the entire snapshot tree with the flat base t.layers = map[common.Hash]snapshot{base.root: base} return nil @@ -509,6 +511,7 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer { bottom.lock.RLock() base := diffToDisk(bottom) bottom.lock.RUnlock() + t.layers[base.root] = base diff.parent = base return base @@ -741,7 +744,6 @@ func (t *Tree) Rebuild(root common.Hash) { // Start generating a new snapshot from scratch on a background thread. The // generator will run a wiper first if there's not one running right now. log.Info("Rebuilding state snapshot") - t.layers = map[common.Hash]snapshot{ root: generateSnapshot(t.diskdb, t.triedb, t.config.CacheSize, root), } @@ -788,6 +790,7 @@ func (t *Tree) Verify(root common.Hash) error { return common.Hash{}, err } defer storageIt.Release() + hash, err := generateTrieRoot(nil, "", storageIt, accountHash, stackTrieGenerate, nil, stat, false) if err != nil { return common.Hash{}, err diff --git a/core/state/state_object.go b/core/state/state_object.go index 2c39efb6fe..2b097c0194 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -66,7 +66,6 @@ func (s StorageMap) Copy() Storage { for key, value := range s { cpy[key] = value } - return cpy } @@ -132,7 +131,6 @@ func (s *StorageSyncMap) Copy() Storage { cpy.Store(key, value) return true }) - return &cpy } @@ -225,7 +223,7 @@ func (s *stateObject) empty() bool { // since it could be invalid. // e.g., AddBalance() to an address, we will do lightCopy to get a new StateObject, we did balance fixup to // make sure object's Balance is reliable. But we did not fixup nonce or code, we only do nonce or codehash - // fixup on need, that's when we wanna to update the nonce or codehash. + // fixup on need, that's when we want to update the nonce or codehash. // So nonce, balance // Before the block is processed, addr_1 account: nonce = 0, emptyCodeHash, balance = 100 // Slot 0 tx 0: no access to addr_1 @@ -240,7 +238,6 @@ func (s *stateObject) empty() bool { } codeHash := s.dbItf.GetCodeHash(s.address) return bytes.Equal(codeHash.Bytes(), emptyCodeHash) // code is empty, the object is empty - } // newObject creates a state object. @@ -350,7 +347,6 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { } if s.db.isParallel && s.db.parallel.isSlotDB { - // Add-Dav: // Need to confirm the object is not destructed in unconfirmed db and resurrected in this tx. // otherwise there is an issue for cases like: // B0: TX0 --> createAccount @addr1 -- merged into DB @@ -358,8 +354,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // Tx1 account@addr1, setState(key0), setState(key1) selfDestruct -- unconfirmed // Tx2 recreate account@addr2, setState(key0) -- executing // TX2 GetState(addr2, key1) --- - // key1 is never set after recurrsect, and should not return state in trie as it destructed in unconfirmed - // TODO - dav: do we need try storages from unconfirmedDB? - currently not because conflict detection need it for get from mainDB. + // key1 is never set after resurrect, and should not return state in trie as it destructed in unconfirmed obj, exist := s.dbItf.GetStateObjectFromUnconfirmedDB(s.address) if exist { if obj.deleted || obj.selfDestructed { @@ -382,7 +377,6 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // 1) resurrect happened, and new slot values were set -- those should // have been handles via pendingStorage above. // 2) we don't have new values, and can deliver empty response back - //if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed { s.db.stateObjectDestructLock.RLock() if _, destructed := s.db.getStateObjectsDegetstruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex? s.db.stateObjectDestructLock.RUnlock() @@ -456,7 +450,7 @@ func (s *stateObject) SetState(key, value common.Hash) { }) if s.db.isParallel && s.db.parallel.isSlotDB { - s.db.parallel.kvChangesInSlot[s.address][key] = struct{}{} // should be moved to here, after `s.db.GetState()` + s.db.parallel.kvChangesInSlot[s.address][key] = struct{}{} } s.setState(key, value) } @@ -480,7 +474,6 @@ func (s *stateObject) finalise(prefetch bool) { } return true }) - if s.dirtyNonce != nil { s.data.Nonce = *s.dirtyNonce s.dirtyNonce = nil @@ -652,73 +645,6 @@ func (s *stateObject) updateTrie() (Trie, error) { s.pendingStorage = newStorage(s.isParallel) // reset pending map return tr, nil - /* - s.pendingStorage.Range(func(keyItf, valueItf interface{}) bool { - key := keyItf.(common.Hash) - value := valueItf.(common.Hash) - // Skip noop changes, persist actual changes - originalValue, _ := s.originStorage.GetValue(key) - if value == originalValue { - return true - } - - prev, _ := s.originStorage.GetValue(key) - s.originStorage.StoreValue(key, value) - - var encoded []byte // rlp-encoded value to be used by the snapshot - if (value == common.Hash{}) { - if err := tr.DeleteStorage(s.address, key[:]); err != nil { - maindb.setError(err) - } - maindb.StorageDeleted += 1 - } else { - // Encoding []byte cannot fail, ok to ignore the error. - trimmed := common.TrimLeftZeroes(value[:]) - encoded, _ = rlp.EncodeToBytes(trimmed) - if err := tr.UpdateStorage(s.address, key[:], trimmed); err != nil { - maindb.setError(err) - } - maindb.StorageUpdated += 1 - } - // Cache the mutated storage slots until commit - if storage == nil { - if storage = maindb.storages[s.addrHash]; storage == nil { - storage = make(map[common.Hash][]byte) - maindb.storages[s.addrHash] = storage - } - } - - khash := crypto.HashData(maindb.hasher, key[:]) - storage[khash] = encoded // encoded will be nil if it's deleted - - // Cache the original value of mutated storage slots - if origin == nil { - if origin = maindb.storagesOrigin[s.address]; origin == nil { - origin = make(map[common.Hash][]byte) - maindb.storagesOrigin[s.address] = origin - } - } - // Track the original value of slot only if it's mutated first time - if _, ok := origin[khash]; !ok { - if prev == (common.Hash{}) { - origin[khash] = nil // nil if it was not present previously - } else { - // Encoding []byte cannot fail, ok to ignore the error. - b, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(prev[:])) - origin[khash] = b - } - } - // Cache the items for preloading - usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure - return true - }) - if maindb.prefetcher != nil { - maindb.prefetcher.used(s.addrHash, s.data.Root, usedStorage) - } - s.pendingStorage = newStorage(s.isParallel) // reset pending map - - return tr, nil - */ } // updateRoot flushes all cached storage mutations to trie, recalculating the @@ -726,7 +652,6 @@ func (s *stateObject) updateTrie() (Trie, error) { func (s *stateObject) updateRoot() { // Flush cached storage mutations into trie, short circuit if any error // is occurred or there is not change in the trie. - // TODO: The trieParallelLock seems heavy, can we remove it? s.db.trieParallelLock.Lock() defer s.db.trieParallelLock.Unlock() @@ -762,10 +687,8 @@ func (s *stateObject) commit() (*trienode.NodeSet, error) { return nil, err } s.data.Root = root - // Update original account data after commit s.origin = s.data.Copy() - return nodes, nil } @@ -860,7 +783,6 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject { } object.code = s.code - // The lock is unnecessary since deepCopy only invoked at global phase and with dirty object that never changed. object.dirtyStorage = s.dirtyStorage.Copy() object.originStorage = s.originStorage.Copy() object.pendingStorage = s.pendingStorage.Copy() @@ -870,7 +792,6 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject { object.dirtyBalance = s.dirtyBalance object.dirtyNonce = s.dirtyNonce object.dirtyCodeHash = s.dirtyCodeHash - return object } diff --git a/core/state/statedb.go b/core/state/statedb.go index e48473467a..b1cecbd372 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -98,7 +98,6 @@ func (s *StateDB) storeStateObj(addr common.Address, stateObject *stateObject) { if s.isParallel { // When a state object is stored into s.parallel.stateObjects, // it belongs to base StateDB, it is confirmed and valid. - // TODO-dav: remove the lock/unlock? s.parallelStateAccessLock.Lock() s.parallel.stateObjects.StoreStateObject(addr, stateObject) s.parallelStateAccessLock.Unlock() @@ -121,20 +120,16 @@ func (s *StateDB) deleteStateObj(addr common.Address) { // ParallelState is for parallel mode only type ParallelState struct { isSlotDB bool // denotes StateDB is used in slot, we will try to remove it - SlotIndex int // for debug, to be removed + SlotIndex int // for debug // stateObjects holds the state objects in the base slot db - // the reason for using stateObjects instead of stateObjects on the outside is - // we need a thread safe map to hold state objects since there are many slots will read - // state objects from it; - // And we will merge all the changes made by the concurrent slot into it. stateObjects *StateObjectSyncMap baseStateDB *StateDB // for parallel mode, there will be a base StateDB in dispatcher routine. baseTxIndex int // slotDB is created base on this tx index. dirtiedStateObjectsInSlot map[common.Address]*stateObject - unconfirmedDBs *sync.Map /*map[int]*ParallelStateDB*/ // do unconfirmed reference in same slot. + unconfirmedDBs *sync.Map // do unconfirmed reference in same slot. - // we will record the read detail for conflict check and + // record the read detail for conflict check and // the changed addr or key for object merge, the changed detail can be achieved from the dirty object nonceChangesInSlot map[common.Address]struct{} nonceReadsInSlot map[common.Address]uint64 @@ -160,17 +155,7 @@ type ParallelState struct { storagesDeleteRecord []common.Hash accountsOriginDeleteRecord []common.Address storagesOriginDeleteRecord []common.Address - - createdObjectRecord map[common.Address]struct{} - - // Transaction will pay gas fee to system address. - // Parallel execution will clear system address's balance at first, in order to maintain transaction's - // gas fee value. Normal transaction will access system address twice, otherwise it means the transaction - // needs real system address's balance, the transaction will be marked redo with keepSystemAddressBalance = true - // systemAddress common.Address - // systemAddressOpsCount int - // keepSystemAddressBalance bool - + createdObjectRecord map[common.Address]struct{} // we may need to redo for some specific reasons, like we read the wrong state and need to panic in sequential mode in SubRefund needsRedo bool } @@ -512,10 +497,6 @@ func (s *StateDB) GetBalance(addr common.Address) (ret *big.Int) { return common.Big0 } -func (s *StateDB) GetBalanceOpCode(addr common.Address) *big.Int { - return s.GetBalance(addr) -} - // GetNonce retrieves the nonce from the given address or 0 if object not found func (s *StateDB) GetNonce(addr common.Address) (ret uint64) { defer func() { @@ -685,9 +666,6 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common // it in stateObjectsDestruct. The effect of doing so is that storage lookups // will not hit disk, since it is assumed that the disk-data is belonging // to a previous incarnation of the object. - // - // TODO(rjl493456442) this function should only be supported by 'unwritable' - // state and all mutations made should all be discarded afterwards. if _, ok := s.getStateObjectsDegetstruct(addr); !ok { s.setStateObjectsDestruct(addr, nil) } @@ -1408,7 +1386,6 @@ func (s *StateDB) CopyForSlot() *ParallelStateDB { // handle tx1, so what thread1's slotDB see in the s.parallel.stateObjects might be the middle result of Thread2. // // We are not do simple copy (lightweight pointer copy) as the stateObject can be accessed by different thread. - // Todo-dav: remove lock guard of parallel.stateObject access. stateObjects: &StateObjectSyncMap{}, // s.parallel.stateObjects, codeReadsInSlot: addressToBytesPool.Get().(map[common.Address][]byte), @@ -1472,32 +1449,12 @@ func (s *StateDB) CopyForSlot() *ParallelStateDB { // and force the miner to operate trie-backed only state.snaps = s.snaps state.snap = s.snap - // deep copy needed state.snapDestructs = addressToStructPool.Get().(map[common.Address]struct{}) s.snapParallelLock.RLock() for k, v := range s.snapDestructs { state.snapDestructs[k] = v } s.snapParallelLock.RUnlock() - // snapAccounts is useless in SlotDB, comment out and remove later - // state.snapAccounts = make(map[common.Address][]byte) // snapAccountPool.Get().(map[common.Address][]byte) - // for k, v := range s.snapAccounts { - // state.snapAccounts[k] = v - // } - - // snapStorage is useless in SlotDB either, it is updated on updateTrie, which is validation phase to update the snapshot of a finalized block. - // state.snapStorage = snapStoragePool.Get().(map[common.Address]map[string][]byte) - // for k, v := range s.snapStorage { - // temp := snapStorageValuePool.Get().(map[string][]byte) - // for kk, vv := range v { - // temp[kk] = vv - // } - // state.snapStorage[k] = temp - // } - - // trie prefetch should be done by dispatcher on StateObject Merge, - // disable it in parallel slot - // state.prefetcher = s.prefetcher } // Deep copy the state changes made in the scope of block @@ -2845,7 +2802,3 @@ func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receip s.SetTxContext(slotDb.thash, slotDb.txIndex) return s } - -func (s *StateDB) ParallelMakeUp(common.Address, []byte) { - // do nothing, this API is for parallel mode -} diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 35cfe36fad..b5f8058114 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -46,7 +46,7 @@ import ( ) var ( - systemAddress = common.HexToAddress("0xffffFFFfFFffffffffffffffFfFFFfffFFFfFFfE") + testAddress = common.HexToAddress("0xffffFFFfFFffffffffffffffFfFFFfffFFFfFFfE") ) // Tests that updating a state trie does not leak any database writes prior to @@ -1382,7 +1382,7 @@ func TestSetAndGetBalance(t *testing.T) { db := NewDatabase(memDb) state, _ := New(common.Hash{}, db, nil) - addr := systemAddress + addr := testAddress state.SetBalance(addr, big.NewInt(1)) state.PrepareForParallel() unconfirmedDBs := new(sync.Map) @@ -1417,7 +1417,7 @@ func TestSubBalance(t *testing.T) { memDb := rawdb.NewMemoryDatabase() db := NewDatabase(memDb) state, _ := New(common.Hash{}, db, nil) - addr := systemAddress + addr := testAddress state.SetBalance(addr, big.NewInt(2)) state.PrepareForParallel() @@ -1452,7 +1452,7 @@ func TestAddBalance(t *testing.T) { memDb := rawdb.NewMemoryDatabase() db := NewDatabase(memDb) state, _ := New(common.Hash{}, db, nil) - addr := systemAddress + addr := testAddress state.SetBalance(addr, big.NewInt(2)) state.PrepareForParallel() unconfirmedDBs := new(sync.Map) @@ -1486,7 +1486,7 @@ func TestEmpty(t *testing.T) { memDb := rawdb.NewMemoryDatabase() db := NewDatabase(memDb) state, _ := New(common.Hash{}, db, nil) - addr := systemAddress + addr := testAddress state.SetBalance(addr, big.NewInt(2)) state.PrepareForParallel() @@ -1507,7 +1507,7 @@ func TestExist(t *testing.T) { memDb := rawdb.NewMemoryDatabase() db := NewDatabase(memDb) state, _ := New(common.Hash{}, db, nil) - addr := systemAddress + addr := testAddress state.SetBalance(addr, big.NewInt(2)) state.PrepareForParallel() unconfirmedDBs := new(sync.Map) @@ -1534,7 +1534,7 @@ func TestMergeSlotDB(t *testing.T) { newSlotDb := NewSlotDB(state, 0, 0, unconfirmedDBs) - addr := systemAddress + addr := testAddress newSlotDb.SetBalance(addr, big.NewInt(2)) newSlotDb.SetState(addr, common.BytesToHash([]byte("test key")), common.BytesToHash([]byte("test store"))) newSlotDb.SetCode(addr, []byte("test code")) diff --git a/core/state_processor.go b/core/state_processor.go index 58d178afc6..b972f6b7b2 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -129,10 +129,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), withdrawals) if p.bc.enableTxDAG { - // compare input TxDAG when it enable in consensus + // compare input TxDAG when it is enabled in consensus dag, err := statedb.ResolveTxDAG(len(block.Transactions()), []common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) if err == nil { - // TODO(galaio): check TxDAG correctness? log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag) if metrics.EnabledExpensive { types.EvaluateTxDAGPerformance(dag, statedb.ResolveStats()) @@ -160,6 +159,7 @@ func applyTransaction(msg *Message, config *params.ChainConfig, gp *GasPool, sta if msg.IsDepositTx && config.IsOptimismRegolith(evm.Context.Time) { nonce = statedb.GetNonce(msg.From) } + // Apply the transaction to the current state (included in the env). result, err := ApplyMessage(evm, msg, gp) if err != nil { diff --git a/core/state_processor_test.go b/core/state_processor_test.go index a32f0b822e..1edf380e7c 100644 --- a/core/state_processor_test.go +++ b/core/state_processor_test.go @@ -18,7 +18,6 @@ package core import ( "crypto/ecdsa" - "github.com/holiman/uint256" "math/big" "testing" @@ -35,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" + "github.com/holiman/uint256" "golang.org/x/crypto/sha3" ) @@ -73,7 +73,6 @@ func TestStateProcessorErrors(t *testing.T) { tx, _ := types.SignTx(types.NewTransaction(nonce, to, amount, gasLimit, gasPrice, data), signer, key) return tx } - var mkDynamicTx = func(nonce uint64, to common.Address, gasLimit uint64, gasTipCap, gasFeeCap *big.Int) *types.Transaction { tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ Nonce: nonce, @@ -147,7 +146,6 @@ func TestStateProcessorErrors(t *testing.T) { }, want: "could not apply tx 1 [0x0026256b3939ed97e2c4a6f3fce8ecf83bdcfa6d507c47838c308a1fb0436f62]: nonce too low: address 0x71562b71999873DB5b286dF957af199Ec94617F7, tx: 0 state: 1", }, - { // ErrNonceTooHigh txs: []*types.Transaction{ makeTx(key1, 100, common.Address{}, big.NewInt(0), params.TxGas, big.NewInt(875000000), nil), @@ -313,6 +311,7 @@ func TestStateProcessorErrors(t *testing.T) { } } } + // ErrSenderNoEOA, for this we need the sender to have contract code { var ( diff --git a/core/state_transition.go b/core/state_transition.go index 9e96f7d611..b85ca61128 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -541,7 +541,6 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { // is always 0 for deposit tx. So calling refundGas will ensure the gasUsed accounting is correct without actually // changing the sender's balance var gasRefund uint64 - if !rules.IsLondon { // Before EIP-3529: refunds were capped to gasUsed / 2 gasRefund = st.refundGas(params.RefundQuotient) diff --git a/core/types/block.go b/core/types/block.go index b32931b054..1a357baa3a 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -171,8 +171,6 @@ type Body struct { Transactions []*Transaction Uncles []*Header Withdrawals []*Withdrawal `rlp:"optional"` - // TODO: add TxDAG in block body - //TxDAG []byte `rlp:"optional"` } // Block represents an Ethereum block. diff --git a/core/types/receipt.go b/core/types/receipt.go index 0352e83559..1b7d03459a 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -18,7 +18,6 @@ package types import ( "bytes" - "encoding/json" "errors" "fmt" "io" @@ -586,11 +585,3 @@ func (rs Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, nu } return nil } - -// Debug PrettyPrint -func (r Receipt) PrettyPrint() (string, error) { - b, err := r.MarshalJSON() - var prettyJSON bytes.Buffer - json.Indent(&prettyJSON, b, "", "\t") - return prettyJSON.String(), err -} diff --git a/core/vm/evm.go b/core/vm/evm.go index 4595855576..2d275e432e 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -260,7 +260,6 @@ func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas contract.optimized, code = tryGetOptimizedCode(evm, codeHash, code) contract.SetCallCode(&addrCopy, codeHash, code) ret, err = evm.interpreter.Run(contract, input, false) - evm.StateDB.ParallelMakeUp(addr, input) gas = contract.Gas } else { addrCopy := addr @@ -269,7 +268,6 @@ func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas contract := NewContract(caller, AccountRef(addrCopy), value, gas) contract.SetCallCode(&addrCopy, evm.StateDB.GetCodeHash(addrCopy), code) ret, err = evm.interpreter.Run(contract, input, false) - evm.StateDB.ParallelMakeUp(addr, input) gas = contract.Gas } } @@ -524,18 +522,14 @@ func (evm *EVM) create(caller ContractRef, codeAndHash *codeAndHash, gas uint64, return nil, common.Address{}, gas, ErrNonceUintOverflow } evm.StateDB.SetNonce(caller.Address(), nonce+1) - // We add this to the access list _before_ taking a snapshot. Even if the creation fails, // the access-list change should not be rolled back if evm.chainRules.IsBerlin { evm.StateDB.AddAddressToAccessList(address) } - // Ensure there's no existing contract already at the designated address contractHash := evm.StateDB.GetCodeHash(address) - // debug - no := evm.StateDB.GetNonce(address) - if no != 0 || (contractHash != (common.Hash{}) && contractHash != types.EmptyCodeHash) { + if evm.StateDB.GetNonce(address) != 0 || (contractHash != (common.Hash{}) && contractHash != types.EmptyCodeHash) { return nil, common.Address{}, 0, ErrContractAddressCollision } // Create a new account on the state diff --git a/core/vm/gas_table.go b/core/vm/gas_table.go index f6dd7c2377..4b141d8f9a 100644 --- a/core/vm/gas_table.go +++ b/core/vm/gas_table.go @@ -18,6 +18,7 @@ package vm import ( "errors" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/params" diff --git a/core/vm/instructions.go b/core/vm/instructions.go index e71ac58422..8dd82aa1fa 100644 --- a/core/vm/instructions.go +++ b/core/vm/instructions.go @@ -259,7 +259,7 @@ func opAddress(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([] func opBalance(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) { slot := scope.Stack.peek() address := common.Address(slot.Bytes20()) - slot.SetFromBig(interpreter.evm.StateDB.GetBalanceOpCode(address)) + slot.SetFromBig(interpreter.evm.StateDB.GetBalance(address)) return nil, nil } @@ -818,7 +818,6 @@ func opSelfdestruct(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext if interpreter.readOnly { return nil, ErrWriteProtection } - beneficiary := scope.Stack.pop() balance := interpreter.evm.StateDB.GetBalance(scope.Contract.Address()) interpreter.evm.StateDB.AddBalance(beneficiary.Bytes20(), balance) diff --git a/core/vm/interface.go b/core/vm/interface.go index 749c5eda68..77a3f35b66 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -31,7 +31,6 @@ type StateDB interface { SubBalance(common.Address, *big.Int) AddBalance(common.Address, *big.Int) GetBalance(common.Address) *big.Int - GetBalanceOpCode(common.Address) *big.Int GetNonce(common.Address) uint64 SetNonce(common.Address, uint64) @@ -79,10 +78,6 @@ type StateDB interface { AddLog(*types.Log) AddPreimage(common.Hash, []byte) - - ParallelMakeUp(addr common.Address, input []byte) - - // todo -dav : delete following TxIndex() int // parallel DAG related diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 9c5443d19e..b18fef42da 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -176,7 +176,6 @@ func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool) ( } }() } - // The Interpreter main run loop (contextual). This loop runs until either an // explicit STOP, RETURN or SELFDESTRUCT is executed, an error occurred during // the execution of one of the operations or until the done flag is set by the @@ -200,7 +199,6 @@ func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool) ( if !contract.UseGas(cost) { return nil, ErrOutOfGas } - if operation.dynamicGas != nil { // All ops with a dynamic memory usage also has a dynamic gas cost. var memorySize uint64 @@ -246,8 +244,10 @@ func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool) ( } pc++ } + if err == errStopToken { err = nil // clear stop token error } + return res, err } diff --git a/core/vm/operations_acl.go b/core/vm/operations_acl.go index 66f6e6322a..04c6409ebd 100644 --- a/core/vm/operations_acl.go +++ b/core/vm/operations_acl.go @@ -18,6 +18,7 @@ package vm import ( "errors" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/params" @@ -36,7 +37,6 @@ func makeGasSStoreFunc(clearingRefund uint64) gasFunc { current = evm.StateDB.GetState(contract.Address(), slot) cost = uint64(0) ) - // Check slot presence in the access list if addrPresent, slotPresent := evm.StateDB.SlotInAccessList(contract.Address(), slot); !slotPresent { cost = params.ColdSloadCostEIP2929 @@ -50,6 +50,7 @@ func makeGasSStoreFunc(clearingRefund uint64) gasFunc { } } value := common.Hash(y.Bytes32()) + if current == value { // noop (1) // EIP 2200 original clause: // return params.SloadGasEIP2200, nil diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 4e8116f0db..0509eed75b 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -64,6 +64,7 @@ func init() { fsHeaderContCheck = 500 * time.Millisecond testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis) + var forkLen = int(fullMaxForkAncestry + 50) var wg sync.WaitGroup diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 7c43bae9d0..6dbe03e214 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -140,7 +140,6 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td if h.merger.PoSFinalized() { return errors.New("disallowed block broadcast") } - // Schedule the block for import h.blockFetcher.Enqueue(peer.ID(), block) diff --git a/metrics/exp/exp.go b/metrics/exp/exp.go index 4530097a2c..7e3f82a075 100644 --- a/metrics/exp/exp.go +++ b/metrics/exp/exp.go @@ -5,7 +5,6 @@ package exp import ( "expvar" "fmt" - "github.com/prometheus/client_golang/prometheus/promhttp" "net/http" "sync" @@ -45,7 +44,6 @@ func Exp(r metrics.Registry) { // http.HandleFunc("/debug/vars", e.expHandler) // haven't found an elegant way, so just use a different endpoint http.Handle("/debug/metrics", h) - http.Handle("/debug/metrics/go_prometheus", promhttp.Handler()) http.Handle("/debug/metrics/prometheus", prometheus.Handler(r)) } @@ -60,7 +58,6 @@ func ExpHandler(r metrics.Registry) http.Handler { func Setup(address string) { m := http.NewServeMux() m.Handle("/debug/metrics", ExpHandler(metrics.DefaultRegistry)) - m.Handle("/debug/metrics/go_prometheus", promhttp.Handler()) m.Handle("/debug/metrics/prometheus", prometheus.Handler(metrics.DefaultRegistry)) log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/debug/metrics", address)) go func() { diff --git a/miner/worker.go b/miner/worker.go index 7b0dd9d73c..5c535a576e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1219,15 +1219,6 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { return &newPayloadResult{err: fmt.Errorf("empty block root")} } - // TODO(galaio): fulfill TxDAG to mined block - //if w.chain.TxDAGEnabled() && w.chainConfig.Optimism != nil { - // txDAG, _ := work.state.ResolveTxDAG([]common.Address{work.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) - // rawTxDAG, err := types.EncodeTxDAG(txDAG) - // if err != nil { - // return &newPayloadResult{err: err} - // } - //} - assembleBlockTimer.UpdateSince(start) log.Debug("assembleBlockTimer", "duration", common.PrettyDuration(time.Since(start)), "parentHash", genParams.parentHash) @@ -1335,7 +1326,6 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti if err != nil { return err } - // If we're post merge, just ignore if !w.isTTDReached(block.Header()) { select { diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index a352020359..4139dfc8b3 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -395,14 +395,11 @@ func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error start = time.Now() dl = db.tree.bottom() ) - // fmt.Printf("Dav -- pathdb Recover, dl, root: %s\n", dl.rootHash()) for dl.rootHash() != root { - // fmt.Printf("Dav -- pathdb Recover, not equal, dl.root %s, root: %s\n", dl.rootHash(), root) h, err := readHistory(db.freezer, dl.stateID()) if err != nil { return err } - dl, err = dl.revert(h, loader) if err != nil { return err diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index fc183b777b..60c891fd2c 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -377,7 +377,6 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer // Apply the reverse state changes upon the current state. This must // be done before holding the lock in order to access state in "this" // layer. - nodes, err := triestate.Apply(h.meta.parent, h.meta.root, h.accounts, h.storages, loader) if err != nil { return nil, err