Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve blocks re-execution and make it compatible with --init.then-quit #2222

Merged
merged 9 commits into from
May 23, 2024
90 changes: 55 additions & 35 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ func (c *Config) Validate() error {
if c.EndBlock < c.StartBlock {
return errors.New("invalid block range for blocks re-execution")
}
if c.Room == 0 {
return errors.New("room for blocks re-execution cannot be zero")
if c.Room <= 0 {
return errors.New("room for blocks re-execution should be greater than 0")
}
return nil
}

var DefaultConfig = Config{
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
BlocksPerThread: 10000,
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

var TestConfig = Config{
Expand All @@ -66,13 +65,14 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
Expand All @@ -84,32 +84,47 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
start = chainStart
end = chainEnd
}
if start < chainStart {
log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis")
if start < chainStart || start > chainEnd {
log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart)
start = chainStart
}
if end > chainEnd {
log.Warn("state reexecutor's end block number is greater than latest, resetting to latest")
if end > chainEnd || end < chainStart {
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
}
if c.Mode == "random" && end != start {
if c.BlocksPerThread > end-start {
c.BlocksPerThread = end - start
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
if rng > end-start {
rng = end - start
}
start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1)))
end = start + c.BlocksPerThread
start += uint64(rand.Intn(int(end - start - rng + 1)))
end = start + rng
}
// inclusive of block reexecution [start, end]
if start > 0 {
// Inclusive of block reexecution [start, end]
// Do not reexecute genesis block i,e chainStart
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
work := (end - start) / uint64(c.Room)
if work > 0 {
blocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
Expand All @@ -119,17 +134,17 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
start := arbmath.SaturatingUSub(currentBlock, s.config.BlocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
// we don't use state release pattern here
// TODO do we want to use release pattern here?
startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
if err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
Expand Down Expand Up @@ -169,9 +184,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) {
log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end)
}

func (s *BlocksReExecutor) Start(ctx context.Context) {
func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
s.StopWaiter.Start(ctx, s)
s.LaunchThread(s.Impl)
s.LaunchThread(func(ctx context.Context) {
s.Impl(ctx)
if done != nil {
close(done)
}
})
}

func (s *BlocksReExecutor) StopAndWait() {
Expand Down
26 changes: 21 additions & 5 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,25 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
if nodeConfig.Init.ThenQuit {
success := make(chan struct{})
blocksReExecutor.Start(ctx, success)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
select {
case err := <-fatalErrChan:
log.Error("shutting down due to fatal error", "err", err)
defer log.Error("shut down due to fatal error", "err", err)
return 1
case <-success:
}
}
}

if nodeConfig.Init.ThenQuit && nodeConfig.Init.ResetToMessage < 0 {
return 0
}
Expand All @@ -529,8 +548,6 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var valNode *valnode.ValidationNode
if sameProcessValidationNodeEnabled {
valNode, err = valnode.CreateValidationNode(
Expand Down Expand Up @@ -659,9 +676,8 @@ func mainImpl() int {
// remove previous deferFuncs, StopAndWait closes database and blockchain.
deferFuncs = []func(){func() { currentNode.StopAndWait() }}
}
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor.Start(ctx)
if blocksReExecutor != nil && !nodeConfig.Init.ThenQuit {
blocksReExecutor.Start(ctx, nil)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
}

Expand Down
26 changes: 9 additions & 17 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func TestBlocksReExecutorModes(t *testing.T) {
}
}

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.Start(ctx, success)

// Reexecute blocks at mode full
go func() {
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.StopWaiter.Start(ctx, executorFull)
executorFull.Impl(ctx)
executorFull.StopAndWait()
success <- struct{}{}
}()
select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand All @@ -66,15 +61,12 @@ func TestBlocksReExecutorModes(t *testing.T) {
}

// Reexecute blocks at mode random
go func() {
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.StopWaiter.Start(ctx, executorRandom)
executorRandom.Impl(ctx)
executorRandom.StopAndWait()
success <- struct{}{}
}()
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.Start(ctx, success)

select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand Down
Loading