Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2845] Add a nitro option to stop syncing at a given block number #2749

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
)
Expand Down Expand Up @@ -223,6 +224,10 @@ func (d *DelayedSequencer) run(ctx context.Context) {
return
}
if err := d.trySequence(ctx, nextHeader); err != nil {
if errors.Is(err, gethexec.ExecutionEngineBlockCreationStopped) {
log.Info("stopping block creation in delayed sequencer because execution engine has stopped")
return
}
log.Error("Delayed sequencer error", "err", err)
}
case <-ctx.Done():
Expand Down
30 changes: 22 additions & 8 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
flag "github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -122,15 +123,28 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h
func (r *InboxReader) Start(ctxIn context.Context) error {
r.StopWaiter.Start(ctxIn, r)
hadError := false
r.CallIteratively(func(ctx context.Context) time.Duration {
err := r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
r.LaunchThread(func(ctx context.Context) {
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
for {
err := r.run(ctx, hadError)
if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) {
log.Info("stopping block creation in inbox reader because transaction streamer has stopped")
return
}
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
}
interval := time.Second
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
return time.Second
})

// Ensure we read the init message before other things start up
Expand Down
2 changes: 1 addition & 1 deletion arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
}

transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &DefaultTransactionStreamerConfig }
execEngine, err := gethexec.NewExecutionEngine(bc)
execEngine, err := gethexec.NewExecutionEngine(bc, 0)
if err != nil {
Fail(t, err)
}
Expand Down
57 changes: 42 additions & 15 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/contracts"
Expand Down Expand Up @@ -593,14 +594,14 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
func (c *SeqCoordinator) update(ctx context.Context) (time.Duration, error) {
chosenSeq, err := c.RedisCoordinator().RecommendSequencerWantingLockout(ctx)
if err != nil {
log.Warn("coordinator failed finding sequencer wanting lockout", "err", err)
return c.retryAfterRedisError()
return c.retryAfterRedisError(), nil
}
if c.prevChosenSequencer == c.config.Url() {
return c.updateWithLockout(ctx, chosenSeq)
return c.updateWithLockout(ctx, chosenSeq), nil
}
if chosenSeq != c.config.Url() && chosenSeq != c.prevChosenSequencer {
var err error
Expand All @@ -621,14 +622,14 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
localMsgCount, err := c.streamer.GetMessageCount()
if err != nil {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
return c.config.UpdateInterval, nil
}
// Cache the previous redis coordinator's message count
if c.prevRedisCoordinator != nil && c.prevRedisMessageCount == 0 {
prevRemoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.prevRedisCoordinator.Client)
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
return c.retryAfterRedisError(), nil
}
c.prevRedisMessageCount = prevRemoteMsgCount
}
Expand All @@ -643,7 +644,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
return c.retryAfterRedisError(), nil
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
client := c.RedisCoordinator().Client
Expand Down Expand Up @@ -720,14 +721,17 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
}
if len(messages) > 0 {
if err := c.streamer.AddMessages(localMsgCount, false, messages); err != nil {
if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) {
return time.Duration(0), broadcastclient.TransactionStreamerBlockCreationStopped
}
log.Warn("coordinator failed to add messages", "err", err, "pos", localMsgCount, "length", len(messages))
} else {
localMsgCount = msgToRead
}
}

if c.config.Url() == redisutil.INVALID_URL {
return c.noRedisError()
return c.noRedisError(), nil
}

// Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago
Expand All @@ -745,7 +749,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
if synced && localMsgCount >= remoteMsgCount && chosenSeq == c.config.Url() {
if c.sequencer == nil {
log.Error("myurl main sequencer, but no sequencer exists")
return c.noRedisError()
return c.noRedisError(), nil
}
processedMessages, err := c.streamer.GetProcessedMessageCount()
if err != nil {
Expand All @@ -765,7 +769,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Warn("failed to update wants lockout key", "err", err)
}
c.prevChosenSequencer = ""
return c.retryAfterRedisError()
return c.retryAfterRedisError(), nil
}
log.Info("caught chosen-coordinator lock", "myUrl", c.config.Url())
if c.delayedSequencer != nil {
Expand All @@ -782,7 +786,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
}
c.sequencer.Activate()
c.prevChosenSequencer = c.config.Url()
return c.noRedisError()
return c.noRedisError(), nil
}
}

Expand All @@ -798,9 +802,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
}

if (wantsLockoutErr != nil) || (msgReadErr != nil) {
return c.retryAfterRedisError()
//lint:ignore nilerr we want to retry after redis error
return c.retryAfterRedisError(), nil
}
return c.noRedisError()
return c.noRedisError(), nil
}

// Warning: acquires the wantsLockoutMutex
Expand Down Expand Up @@ -865,19 +870,41 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) {
err, "newRedisUrl", c.config.NewRedisUrl)
}
}
c.CallIteratively(func(ctx context.Context) time.Duration { return c.chooseRedisAndUpdate(ctx, newRedisCoordinator) })
amsanghi marked this conversation as resolved.
Show resolved Hide resolved

c.LaunchThread(func(ctx context.Context) {
for {
interval, err := c.chooseRedisAndUpdate(ctx, newRedisCoordinator)
if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) {
log.Info("stopping block creation in sequencer because transaction streamer has stopped")
return
}
if ctx.Err() != nil {
return
}
if interval == time.Duration(0) {
continue
}
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
})
if c.config.ChosenHealthcheckAddr != "" {
c.StopWaiter.LaunchThread(c.launchHealthcheckServer)
}
}

func (c *SeqCoordinator) chooseRedisAndUpdate(ctx context.Context, newRedisCoordinator *redisutil.RedisCoordinator) time.Duration {
func (c *SeqCoordinator) chooseRedisAndUpdate(ctx context.Context, newRedisCoordinator *redisutil.RedisCoordinator) (time.Duration, error) {
// If we have a new redis coordinator, and we haven't switched to it yet, try to switch.
if c.config.NewRedisUrl != "" && c.prevRedisCoordinator == nil {
// If we fail to try to switch, we'll retry soon.
if err := c.trySwitchingRedis(ctx, newRedisCoordinator); err != nil {
log.Warn("error while trying to switch redis coordinator", "err", err)
return c.retryAfterRedisError()
return c.retryAfterRedisError(), nil
}
}
return c.update(ctx)
Expand Down
13 changes: 13 additions & 0 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/execution"
Expand Down Expand Up @@ -74,6 +75,7 @@ type TransactionStreamerConfig struct {
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig
Expand All @@ -82,18 +84,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
SyncTillBlock: 0,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
SyncTillBlock: 0,
}

func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages")
f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)")
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block")
}

func NewTransactionStreamer(
Expand Down Expand Up @@ -1044,6 +1049,10 @@ func (s *TransactionStreamer) broadcastMessages(
// The mutex must be held, and pos must be the latest message count.
// `batch` may be nil, which initializes a new batch. The batch is closed out in this function.
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
if s.config().SyncTillBlock > 0 && uint64(pos) > s.config().SyncTillBlock {
return broadcastclient.TransactionStreamerBlockCreationStopped
}

if batch == nil {
batch = s.db.NewBatch()
}
Expand Down Expand Up @@ -1212,6 +1221,10 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
}

func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struct{}) time.Duration {
if s.config().SyncTillBlock > 0 && uint64(s.execLastMsgCount) >= s.config().SyncTillBlock {
log.Info("stopping block creation in transaction streamer", "syncTillBlock", s.config().SyncTillBlock)
return s.config().ExecuteMessageLoopDelay
}
if s.ExecuteNextMsg(ctx) {
return 0
}
Expand Down
6 changes: 6 additions & 0 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
sourcesDisconnectedGauge = metrics.NewRegisteredGauge("arb/feed/sources/disconnected", nil)
)

var TransactionStreamerBlockCreationStopped = errors.New("block creation stopped in transaction streamer")

type FeedConfig struct {
Output wsbroadcastserver.BroadcasterConfig `koanf:"output" reload:"hot"`
Input Config `koanf:"input" reload:"hot"`
Expand Down Expand Up @@ -434,6 +436,10 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) {
bc.nextSeqNum = message.SequenceNumber + 1
}
if err := bc.txStreamer.AddBroadcastMessages(res.Messages); err != nil {
if errors.Is(err, TransactionStreamerBlockCreationStopped) {
log.Info("stopping block creation in broadcast client because transaction streamer has stopped")
return
}
log.Error("Error adding message from Sequencer Feed", "err", err)
}
}
Expand Down
5 changes: 5 additions & 0 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package broadcastclients

import (
"context"
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -194,6 +195,10 @@ func (bcs *BroadcastClients) Start(ctx context.Context) {
// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) {
log.Info("stopping block creation in broadcast clients because transaction streamer has stopped")
return
}
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
Expand Down
1 change: 1 addition & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ func mainImpl() int {
l2BlockChain,
l1Client,
func() *gethexec.Config { return &liveNodeConfig.Get().Execution },
liveNodeConfig.Get().Node.TransactionStreamer.SyncTillBlock,
)
if err != nil {
log.Error("failed to create execution node", "err", err)
Expand Down
14 changes: 13 additions & 1 deletion execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ var (
gasUsedSinceStartupCounter = metrics.NewRegisteredCounter("arb/gas_used", nil)
)

var ExecutionEngineBlockCreationStopped = errors.New("block creation stopped in execution engine")

type L1PriceDataOfMsg struct {
callDataUnits uint64
cummulativeCallDataUnits uint64
Expand Down Expand Up @@ -92,6 +94,7 @@ type ExecutionEngine struct {
prefetchBlock bool

cachedL1PriceData *L1PriceData
syncTillBlock uint64
}

func NewL1PriceData() *L1PriceData {
Expand All @@ -100,12 +103,13 @@ func NewL1PriceData() *L1PriceData {
}
}

func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) {
func NewExecutionEngine(bc *core.BlockChain, syncTillBlock uint64) (*ExecutionEngine, error) {
return &ExecutionEngine{
bc: bc,
resequenceChan: make(chan []*arbostypes.MessageWithMetadata),
newBlockNotifier: make(chan struct{}, 1),
cachedL1PriceData: NewL1PriceData(),
syncTillBlock: syncTillBlock,
}, nil
}

Expand Down Expand Up @@ -585,6 +589,10 @@ func (s *ExecutionEngine) SequenceDelayedMessage(message *arbostypes.L1IncomingM
}

func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) (*types.Block, error) {
if s.syncTillBlock > 0 && s.latestBlock.NumberU64() >= s.syncTillBlock {
return nil, ExecutionEngineBlockCreationStopped
}

currentHeader, err := s.getCurrentHeader()
if err != nil {
return nil, err
Expand Down Expand Up @@ -946,6 +954,10 @@ func (s *ExecutionEngine) Start(ctx_in context.Context) {
s.StopWaiter.Start(ctx_in, s)
s.LaunchThread(func(ctx context.Context) {
for {
if s.syncTillBlock > 0 && s.latestBlock.NumberU64() >= s.syncTillBlock {
log.Info("stopping block creation in execution engine", "syncTillBlock", s.syncTillBlock)
return
}
select {
case <-ctx.Done():
return
Expand Down
Loading
Loading