Skip to content

Commit

Permalink
Merge branch 'master' into validator-gracefull-shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar authored May 23, 2024
2 parents 56fc8d4 + ed3f9e0 commit 717b00b
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 67 deletions.
14 changes: 7 additions & 7 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,31 +851,31 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti
// different type with a lower nonce.
// If we decide not to send this tx yet, just leave it queued and with Sent set to false.
// The resending/repricing loop in DataPoster.Start will keep trying.
if !newTx.Sent && newTx.FullTx.Nonce() > 0 {
previouslySent := newTx.Sent || (prevTx != nil && prevTx.Sent) // if we've previously sent this nonce
if !previouslySent && newTx.FullTx.Nonce() > 0 {
precedingTx, err := p.queue.Get(ctx, arbmath.SaturatingUSub(newTx.FullTx.Nonce(), 1))
if err != nil {
return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
if precedingTx != nil { // precedingTx == nil -> the actual preceding tx was already confirmed
var latestBlockNumber, prevBlockNumber, reorgResistantNonce uint64
var latestBlockNumber, prevBlockNumber, reorgResistantTxCount uint64
if precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent {
latestBlockNumber, err = p.client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
prevBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, 1)
reorgResistantNonce, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber))
reorgResistantTxCount, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber))
if err != nil {
return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}

if precedingTx.FullTx.Nonce() > reorgResistantNonce {
log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent)
if newTx.FullTx.Nonce() > reorgResistantTxCount {
log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount)
return nil
}
} else {
log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantNonce", reorgResistantNonce)
}
log.Debug("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantTxCount", reorgResistantTxCount)
}
}

Expand Down
90 changes: 55 additions & 35 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ func (c *Config) Validate() error {
if c.EndBlock < c.StartBlock {
return errors.New("invalid block range for blocks re-execution")
}
if c.Room == 0 {
return errors.New("room for blocks re-execution cannot be zero")
if c.Room <= 0 {
return errors.New("room for blocks re-execution should be greater than 0")
}
return nil
}

var DefaultConfig = Config{
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
BlocksPerThread: 10000,
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
}

var TestConfig = Config{
Expand All @@ -66,13 +65,14 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
Expand All @@ -84,32 +84,47 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
start = chainStart
end = chainEnd
}
if start < chainStart {
log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis")
if start < chainStart || start > chainEnd {
log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart)
start = chainStart
}
if end > chainEnd {
log.Warn("state reexecutor's end block number is greater than latest, resetting to latest")
if end > chainEnd || end < chainStart {
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
}
if c.Mode == "random" && end != start {
if c.BlocksPerThread > end-start {
c.BlocksPerThread = end - start
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
if rng > end-start {
rng = end - start
}
start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1)))
end = start + c.BlocksPerThread
start += uint64(rand.Intn(int(end - start - rng + 1)))
end = start + rng
}
// inclusive of block reexecution [start, end]
if start > 0 {
// Inclusive of block reexecution [start, end]
// Do not reexecute genesis block i,e chainStart
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
work := (end - start) / uint64(c.Room)
if work > 0 {
blocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
Expand All @@ -119,17 +134,17 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
start := arbmath.SaturatingUSub(currentBlock, s.config.BlocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
// we don't use state release pattern here
// TODO do we want to use release pattern here?
startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
if err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
Expand Down Expand Up @@ -169,9 +184,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) {
log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end)
}

func (s *BlocksReExecutor) Start(ctx context.Context) {
func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
s.StopWaiter.Start(ctx, s)
s.LaunchThread(s.Impl)
s.LaunchThread(func(ctx context.Context) {
s.Impl(ctx)
if done != nil {
close(done)
}
})
}

func (s *BlocksReExecutor) StopAndWait() {
Expand Down
26 changes: 21 additions & 5 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,25 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
if nodeConfig.Init.ThenQuit {
success := make(chan struct{})
blocksReExecutor.Start(ctx, success)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
select {
case err := <-fatalErrChan:
log.Error("shutting down due to fatal error", "err", err)
defer log.Error("shut down due to fatal error", "err", err)
return 1
case <-success:
}
}
}

if nodeConfig.Init.ThenQuit && nodeConfig.Init.ResetToMessage < 0 {
return 0
}
Expand All @@ -529,8 +548,6 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var valNode *valnode.ValidationNode
if sameProcessValidationNodeEnabled {
valNode, err = valnode.CreateValidationNode(
Expand Down Expand Up @@ -659,9 +676,8 @@ func mainImpl() int {
// remove previous deferFuncs, StopAndWait closes database and blockchain.
deferFuncs = []func(){func() { currentNode.StopAndWait() }}
}
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor.Start(ctx)
if blocksReExecutor != nil && !nodeConfig.Init.ThenQuit {
blocksReExecutor.Start(ctx, nil)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
}

Expand Down
7 changes: 4 additions & 3 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,9 @@ validationsLoop:
}
for _, moduleRoot := range wasmRoots {
if v.chosenValidator[moduleRoot] == nil {
v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot))
continue
notFoundErr := fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot)
v.possiblyFatal(notFoundErr)
return nil, notFoundErr
}
if v.chosenValidator[moduleRoot].Room() == 0 {
log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot)
Expand Down Expand Up @@ -1107,7 +1108,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
}
}
if v.chosenValidator[root] == nil {
log.Error("validator not found", "WasmModuleRoot", root)
return fmt.Errorf("cannot validate WasmModuleRoot %v", root)
}
}
}
Expand Down
26 changes: 9 additions & 17 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func TestBlocksReExecutorModes(t *testing.T) {
}
}

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.Start(ctx, success)

// Reexecute blocks at mode full
go func() {
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.StopWaiter.Start(ctx, executorFull)
executorFull.Impl(ctx)
executorFull.StopAndWait()
success <- struct{}{}
}()
select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand All @@ -66,15 +61,12 @@ func TestBlocksReExecutorModes(t *testing.T) {
}

// Reexecute blocks at mode random
go func() {
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.StopWaiter.Start(ctx, executorRandom)
executorRandom.Impl(ctx)
executorRandom.StopAndWait()
success <- struct{}{}
}()
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.Start(ctx, success)

select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand Down

0 comments on commit 717b00b

Please sign in to comment.