Skip to content

Commit

Permalink
Merge branch 'master' into pebble-extra-options
Browse files Browse the repository at this point in the history
  • Loading branch information
PlasmaPower authored May 23, 2024
2 parents bc31447 + 35bd2aa commit a4e6f3e
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 63 deletions.
90 changes: 55 additions & 35 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ func (c *Config) Validate() error {
if c.EndBlock < c.StartBlock {
return errors.New("invalid block range for blocks re-execution")
}
if c.Room == 0 {
return errors.New("room for blocks re-execution cannot be zero")
if c.Room <= 0 {
return errors.New("room for blocks re-execution should be greater than 0")
}
return nil
}

var DefaultConfig = Config{
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
BlocksPerThread: 10000,
Enable: false,
Mode: "random",
Room: runtime.NumCPU(),
}

var TestConfig = Config{
Expand All @@ -66,13 +65,14 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
Expand All @@ -84,32 +84,47 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
start = chainStart
end = chainEnd
}
if start < chainStart {
log.Warn("state reexecutor's start block number is lower than genesis, resetting to genesis")
if start < chainStart || start > chainEnd {
log.Warn("invalid state reexecutor's start block number, resetting to genesis", "start", start, "genesis", chainStart)
start = chainStart
}
if end > chainEnd {
log.Warn("state reexecutor's end block number is greater than latest, resetting to latest")
if end > chainEnd || end < chainStart {
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
}
if c.Mode == "random" && end != start {
if c.BlocksPerThread > end-start {
c.BlocksPerThread = end - start
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
if rng > end-start {
rng = end - start
}
start += uint64(rand.Intn(int(end - start - c.BlocksPerThread + 1)))
end = start + c.BlocksPerThread
start += uint64(rand.Intn(int(end - start - rng + 1)))
end = start + rng
}
// inclusive of block reexecution [start, end]
if start > 0 {
// Inclusive of block reexecution [start, end]
// Do not reexecute genesis block i,e chainStart
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
work := (end - start) / uint64(c.Room)
if work > 0 {
blocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
Expand All @@ -119,17 +134,17 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
start := arbmath.SaturatingUSub(currentBlock, s.config.BlocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
// we don't use state release pattern here
// TODO do we want to use release pattern here?
startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
startState, startHeader, release, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1)
if err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
Expand Down Expand Up @@ -169,9 +184,14 @@ func (s *BlocksReExecutor) Impl(ctx context.Context) {
log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", s.startBlock, "startBlock", s.startBlock+1, "endBlock", end)
}

func (s *BlocksReExecutor) Start(ctx context.Context) {
func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
s.StopWaiter.Start(ctx, s)
s.LaunchThread(s.Impl)
s.LaunchThread(func(ctx context.Context) {
s.Impl(ctx)
if done != nil {
close(done)
}
})
}

func (s *BlocksReExecutor) StopAndWait() {
Expand Down
26 changes: 21 additions & 5 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,25 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
if nodeConfig.Init.ThenQuit {
success := make(chan struct{})
blocksReExecutor.Start(ctx, success)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
select {
case err := <-fatalErrChan:
log.Error("shutting down due to fatal error", "err", err)
defer log.Error("shut down due to fatal error", "err", err)
return 1
case <-success:
}
}
}

if nodeConfig.Init.ThenQuit && nodeConfig.Init.ResetToMessage < 0 {
return 0
}
Expand All @@ -529,8 +548,6 @@ func mainImpl() int {
return 1
}

fatalErrChan := make(chan error, 10)

var valNode *valnode.ValidationNode
if sameProcessValidationNodeEnabled {
valNode, err = valnode.CreateValidationNode(
Expand Down Expand Up @@ -659,9 +676,8 @@ func mainImpl() int {
// remove previous deferFuncs, StopAndWait closes database and blockchain.
deferFuncs = []func(){func() { currentNode.StopAndWait() }}
}
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor.Start(ctx)
if blocksReExecutor != nil && !nodeConfig.Init.ThenQuit {
blocksReExecutor.Start(ctx, nil)
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
}

Expand Down
17 changes: 14 additions & 3 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream
if streamName == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
consumer := &Consumer[Request, Response]{
return &Consumer[Request, Response]{
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
}
return consumer, nil
}, nil
}

// Start starts the consumer to iteratively perform heartbeat in configured intervals.
Expand All @@ -80,6 +79,7 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) {

func (c *Consumer[Request, Response]) StopAndWait() {
c.StopWaiter.StopAndWait()
c.deleteHeartBeat(c.GetParentContext())
}

func heartBeatKey(id string) string {
Expand All @@ -90,6 +90,17 @@ func (c *Consumer[Request, Response]) heartBeatKey() string {
return heartBeatKey(c.id)
}

// deleteHeartBeat deletes the heartbeat to indicate it is being shut down.
func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) {
if err := c.client.Del(ctx, c.heartBeatKey()).Err(); err != nil {
l := log.Info
if ctx.Err() != nil {
l = log.Error
}
l("Deleting heardbeat", "consumer", c.id, "error", err)
}
}

// heartBeat updates the heartBeat key indicating aliveness.
func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
if err := c.client.Set(ctx, c.heartBeatKey(), time.Now().UnixMilli(), 2*c.cfg.KeepAliveTimeout).Err(); err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"
"testing"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -201,6 +203,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques
}

func TestRedisProduce(t *testing.T) {
log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
t.Parallel()
for _, tc := range []struct {
name string
Expand All @@ -212,7 +215,7 @@ func TestRedisProduce(t *testing.T) {
},
{
name: "some consumers killed, others should take over their work",
killConsumers: false,
killConsumers: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -229,21 +232,23 @@ func TestRedisProduce(t *testing.T) {
// Consumer messages in every third consumer but don't ack them to check
// that other consumers will claim ownership on those messages.
for i := 0; i < len(consumers); i += 3 {
consumers[i].Start(ctx)
if _, err := consumers[i].Consume(ctx); err != nil {
t.Errorf("Error consuming message: %v", err)
}
consumers[i].StopAndWait()
}

}
time.Sleep(time.Second)
gotMessages, wantResponses := consume(ctx, t, consumers)
gotResponses, err := awaitResponses(ctx, promises)
if err != nil {
t.Fatalf("Error awaiting responses: %v", err)
}
producer.StopAndWait()
for _, c := range consumers {
c.StopWaiter.StopAndWait()
c.StopAndWait()
}
got, err := mergeValues(gotMessages)
if err != nil {
Expand Down Expand Up @@ -280,6 +285,7 @@ func TestRedisReproduceDisabled(t *testing.T) {
// Consumer messages in every third consumer but don't ack them to check
// that other consumers will claim ownership on those messages.
for i := 0; i < len(consumers); i += 3 {
consumers[i].Start(ctx)
if _, err := consumers[i].Consume(ctx); err != nil {
t.Errorf("Error consuming message: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion system_tests/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops
redisURL := ""
if useRedisStreams {
redisURL = redisutil.CreateTestRedis(ctx, t)
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.DefaultValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL
}

Expand Down
26 changes: 9 additions & 17 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func TestBlocksReExecutorModes(t *testing.T) {
}
}

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.Start(ctx, success)

// Reexecute blocks at mode full
go func() {
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull.StopWaiter.Start(ctx, executorFull)
executorFull.Impl(ctx)
executorFull.StopAndWait()
success <- struct{}{}
}()
select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand All @@ -66,15 +61,12 @@ func TestBlocksReExecutorModes(t *testing.T) {
}

// Reexecute blocks at mode random
go func() {
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.StopWaiter.Start(ctx, executorRandom)
executorRandom.Impl(ctx)
executorRandom.StopAndWait()
success <- struct{}{}
}()
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom.Start(ctx, success)

select {
case err := <-feedErrChan:
t.Errorf("error occurred: %v", err)
Expand Down

0 comments on commit a4e6f3e

Please sign in to comment.