Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve blocks re-execution and make it compatible with --init.then-quit #2222

Merged
merged 9 commits into from
May 23, 2024
54 changes: 37 additions & 17 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ func (c *Config) Validate() error {
}

var DefaultConfig = Config{
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
BlocksPerThread: 10000,
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

var TestConfig = Config{
Expand Down Expand Up @@ -84,25 +83,39 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
start = chainStart
end = chainEnd
}
if start < chainStart {
log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis")
if start < chainStart || start > chainEnd {
log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart)
start = chainStart
}
if end > chainEnd {
log.Warn("state reexecutor's end block number is greater than latest, resetting to latest")
if end > chainEnd || end < chainStart {
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
if c.Mode == "random" && end != start {
if c.BlocksPerThread > end-start {
c.BlocksPerThread = end - start
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := uint64(10000)
if c.BlocksPerThread != 0 {
rng = c.BlocksPerThread
}
if rng > end-start {
rng = end - start
}
start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1)))
end = start + c.BlocksPerThread
start += uint64(rand.Intn(int(end - start - rng + 1)))
end = start + rng
}
// inclusive of block reexecution [start, end]
if start > 0 {
// Inclusive of block reexecution [start, end]
// Do not reexecute genesis block i,e chainStart
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads
if c.BlocksPerThread == 0 {
c.BlocksPerThread = 10000
work := (end - start) / uint64(c.Room)
if work > 0 {
c.BlocksPerThread = work
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
Expand All @@ -125,11 +138,13 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB
}
// we don't use state release pattern here
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
// TODO do we want to use release pattern here?
startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
if err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
Expand Down Expand Up @@ -169,9 +184,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) {
log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end)
}

func (s *BlocksReExecutor) Start(ctx context.Context) {
func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
s.StopWaiter.Start(ctx, s)
s.LaunchThread(s.Impl)
s.LaunchThread(func(ctx context.Context) {
s.Impl(ctx)
if done != nil {
close(done)
}
})
}

func (s *BlocksReExecutor) StopAndWait() {
Expand Down
26 changes: 21 additions & 5 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,25 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
if nodeConfig.Init.ThenQuit {
success := make(chan struct{})
blocksReExecutor.Start(ctx, success)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
select {
case err := <-fatalErrChan:
log.Error("shutting down due to fatal error", "err", err)
defer log.Error("shut down due to fatal error", "err", err)
return 1
case <-success:
}
}
}

if nodeConfig.Init.ThenQuit && nodeConfig.Init.ResetToMessage < 0 {
return 0
}
Expand All @@ -514,8 +533,6 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var valNode *valnode.ValidationNode
if sameProcessValidationNodeEnabled {
valNode, err = valnode.CreateValidationNode(
Expand Down Expand Up @@ -644,9 +661,8 @@ func mainImpl() int {
// remove previous deferFuncs, StopAndWait closes database and blockchain.
deferFuncs = []func(){func() { currentNode.StopAndWait() }}
}
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor.Start(ctx)
if blocksReExecutor != nil && !nodeConfig.Init.ThenQuit {
blocksReExecutor.Start(ctx, nil)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
}

Expand Down
26 changes: 9 additions & 17 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func TestBlocksReExecutorModes(t *testing.T) {
}
}

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.Start(ctx, success)

// Reexecute blocks at mode full
go func() {
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.StopWaiter.Start(ctx, executorFull)
executorFull.Impl(ctx)
executorFull.StopAndWait()
success <- struct{}{}
}()
select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand All @@ -66,15 +61,12 @@ func TestBlocksReExecutorModes(t *testing.T) {
}

// Reexecute blocks at mode random
go func() {
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.StopWaiter.Start(ctx, executorRandom)
executorRandom.Impl(ctx)
executorRandom.StopAndWait()
success <- struct{}{}
}()
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.Start(ctx, success)

select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand Down
Loading