diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index fb35ac3c8d..5aaef959d8 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -851,31 +851,31 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti // different type with a lower nonce. // If we decide not to send this tx yet, just leave it queued and with Sent set to false. // The resending/repricing loop in DataPoster.Start will keep trying. - if !newTx.Sent && newTx.FullTx.Nonce() > 0 { + previouslySent := newTx.Sent || (prevTx != nil && prevTx.Sent) // if we've previously sent this nonce + if !previouslySent && newTx.FullTx.Nonce() > 0 { precedingTx, err := p.queue.Get(ctx, arbmath.SaturatingUSub(newTx.FullTx.Nonce(), 1)) if err != nil { return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } if precedingTx != nil { // precedingTx == nil -> the actual preceding tx was already confirmed - var latestBlockNumber, prevBlockNumber, reorgResistantNonce uint64 + var latestBlockNumber, prevBlockNumber, reorgResistantTxCount uint64 if precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent { latestBlockNumber, err = p.client.BlockNumber(ctx) if err != nil { return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } prevBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, 1) - reorgResistantNonce, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber)) + reorgResistantTxCount, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber)) if err != nil { return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err) } - if precedingTx.FullTx.Nonce() > reorgResistantNonce { - log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent) + if newTx.FullTx.Nonce() > reorgResistantTxCount { + log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount) return nil } - } else { - log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantNonce", reorgResistantNonce) } + log.Debug("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount) } } diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index bb6de00cad..1e4a06fe90 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -35,17 +35,16 @@ func (c *Config) Validate() error { if c.EndBlock < c.StartBlock { return errors.New("invalid block range for blocks re-execution") } - if c.Room == 0 { - return errors.New("room for blocks re-execution cannot be zero") + if c.Room <= 0 { + return errors.New("room for blocks re-execution should be greater than 0") } return nil } var DefaultConfig = Config{ - Enable: false, - Mode: "random", - Room: runtime.NumCPU(), - BlocksPerThread: 10000, + Enable: false, + Mode: "random", + Room: runtime.NumCPU(), } var TestConfig = Config{ @@ -66,13 +65,14 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { type BlocksReExecutor struct { stopwaiter.StopWaiter - config *Config - blockchain *core.BlockChain - stateFor arbitrum.StateForHeaderFunction - done chan struct{} - fatalErrChan chan error - startBlock uint64 - currentBlock uint64 + config *Config + blockchain *core.BlockChain + stateFor arbitrum.StateForHeaderFunction + done chan struct{} + fatalErrChan chan error + startBlock uint64 + currentBlock uint64 + blocksPerThread uint64 } func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor { @@ -84,32 +84,47 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block start = chainStart end = chainEnd } - if start < chainStart { - log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis") + if start < chainStart || start > chainEnd { + log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart) start = chainStart } - if end > chainEnd { - log.Warn("state reexecutor's end block number is greater than latest, resetting to latest") + if end > chainEnd || end < chainStart { + log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd) end = chainEnd } + blocksPerThread := uint64(10000) + if c.BlocksPerThread != 0 { + blocksPerThread = c.BlocksPerThread + } if c.Mode == "random" && end != start { - if c.BlocksPerThread > end-start { - c.BlocksPerThread = end - start + // Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly + rng := blocksPerThread + if rng > end-start { + rng = end - start } - start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1))) - end = start + c.BlocksPerThread + start += uint64(rand.Intn(int(end - start - rng + 1))) + end = start + rng } - // inclusive of block reexecution [start, end] - if start > 0 { + // Inclusive of block reexecution [start, end] + // Do not reexecute genesis block i,e chainStart + if start > 0 && start != chainStart { start-- } + // Divide work equally among available threads when BlocksPerThread is zero + if c.BlocksPerThread == 0 { + work := (end - start) / uint64(c.Room) + if work > 0 { + blocksPerThread = work + } + } return &BlocksReExecutor{ - config: c, - blockchain: blockchain, - currentBlock: end, - startBlock: start, - done: make(chan struct{}, c.Room), - fatalErrChan: fatalErrChan, + config: c, + blockchain: blockchain, + currentBlock: end, + startBlock: start, + blocksPerThread: blocksPerThread, + done: make(chan struct{}, c.Room), + fatalErrChan: fatalErrChan, stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) { state, err := blockchain.StateAt(header.Root) return state, arbitrum.NoopStateRelease, err @@ -119,17 +134,17 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block // LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 { - start := arbmath.SaturatingUSub(currentBlock, s.config.BlocksPerThread) + start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread) if start < s.startBlock { start = s.startBlock } - // we don't use state release pattern here - // TODO do we want to use release pattern here? - startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) + startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) if err != nil { s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err) return s.startBlock } + // NoOp + defer release() start = startHeader.Number.Uint64() s.LaunchThread(func(ctx context.Context) { _, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil) @@ -169,9 +184,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) { log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end) } -func (s *BlocksReExecutor) Start(ctx context.Context) { +func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) { s.StopWaiter.Start(ctx, s) - s.LaunchThread(s.Impl) + s.LaunchThread(func(ctx context.Context) { + s.Impl(ctx) + if done != nil { + close(done) + } + }) } func (s *BlocksReExecutor) StopAndWait() { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 815257cf7a..bb92fa8d99 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -509,6 +509,25 @@ func mainImpl() int { return 1 } + fatalErrChan := make(chan error, 10) + + var blocksReExecutor *blocksreexecutor.BlocksReExecutor + if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil { + blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan) + if nodeConfig.Init.ThenQuit { + success := make(chan struct{}) + blocksReExecutor.Start(ctx, success) + deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() }) + select { + case err := <-fatalErrChan: + log.Error("shutting down due to fatal error", "err", err) + defer log.Error("shut down due to fatal error", "err", err) + return 1 + case <-success: + } + } + } + if nodeConfig.Init.ThenQuit && nodeConfig.Init.ResetToMessage < 0 { return 0 } @@ -529,8 +548,6 @@ func mainImpl() int { return 1 } - fatalErrChan := make(chan error, 10) - var valNode *valnode.ValidationNode if sameProcessValidationNodeEnabled { valNode, err = valnode.CreateValidationNode( @@ -659,9 +676,8 @@ func mainImpl() int { // remove previous deferFuncs, StopAndWait closes database and blockchain. deferFuncs = []func(){func() { currentNode.StopAndWait() }} } - if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil { - blocksReExecutor := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan) - blocksReExecutor.Start(ctx) + if blocksReExecutor != nil && !nodeConfig.Init.ThenQuit { + blocksReExecutor.Start(ctx, nil) deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() }) } diff --git a/staker/block_validator.go b/staker/block_validator.go index 027ee78248..5a511920f2 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -791,8 +791,9 @@ validationsLoop: } for _, moduleRoot := range wasmRoots { if v.chosenValidator[moduleRoot] == nil { - v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot)) - continue + notFoundErr := fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot) + v.possiblyFatal(notFoundErr) + return nil, notFoundErr } if v.chosenValidator[moduleRoot].Room() == 0 { log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot) @@ -1107,7 +1108,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { } } if v.chosenValidator[root] == nil { - log.Error("validator not found", "WasmModuleRoot", root) + return fmt.Errorf("cannot validate WasmModuleRoot %v", root) } } } diff --git a/system_tests/blocks_reexecutor_test.go b/system_tests/blocks_reexecutor_test.go index c2941ddcc4..66690d1427 100644 --- a/system_tests/blocks_reexecutor_test.go +++ b/system_tests/blocks_reexecutor_test.go @@ -45,16 +45,11 @@ func TestBlocksReExecutorModes(t *testing.T) { } } + // Reexecute blocks at mode full success := make(chan struct{}) + executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan) + executorFull.Start(ctx, success) - // Reexecute blocks at mode full - go func() { - executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan) - executorFull.StopWaiter.Start(ctx, executorFull) - executorFull.Impl(ctx) - executorFull.StopAndWait() - success <- struct{}{} - }() select { case err := <-feedErrChan: t.Errorf("error occurred: %v", err) @@ -66,15 +61,12 @@ func TestBlocksReExecutorModes(t *testing.T) { } // Reexecute blocks at mode random - go func() { - c := &blocksreexecutor.TestConfig - c.Mode = "random" - executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan) - executorRandom.StopWaiter.Start(ctx, executorRandom) - executorRandom.Impl(ctx) - executorRandom.StopAndWait() - success <- struct{}{} - }() + success = make(chan struct{}) + c := &blocksreexecutor.TestConfig + c.Mode = "random" + executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan) + executorRandom.Start(ctx, success) + select { case err := <-feedErrChan: t.Errorf("error occurred: %v", err)