Skip to content

Commit

Permalink
Merge pull request #2437 from OffchainLabs/fix-seqcoordinator-lockout
Browse files Browse the repository at this point in the history
Sequencer coordinator shouldn't want lockout if local blockchain is lagging too much behind transaction streamer
  • Loading branch information
tsahee authored Jun 28, 2024
2 parents 03961e2 + 9d7483a commit 9085e2b
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 12 deletions.
5 changes: 5 additions & 0 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type execClientWrapper struct {
func (w *execClientWrapper) Pause() { w.t.Error("not supported") }
func (w *execClientWrapper) Activate() { w.t.Error("not supported") }
func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil }
func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false }
func (w *execClientWrapper) FullSyncProgressMap() map[string]interface{} {
w.t.Error("not supported")
return nil
}

func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) {
chainConfig := params.ArbitrumDevTestChainConfig()
Expand Down
9 changes: 4 additions & 5 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -150,7 +149,6 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -607,9 +605,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
return c.noRedisError()
}

syncProgress := c.sync.SyncProgressMap()
synced := len(syncProgress) == 0
// Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago
synced := c.sequencer.Synced()
if !synced {
syncProgress := c.sequencer.FullSyncProgressMap()
var detailsList []interface{}
for key, value := range syncProgress {
detailsList = append(detailsList, key, value)
Expand Down Expand Up @@ -849,7 +848,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) {
defer c.wantsLockoutMutex.Unlock()
c.avoidLockout--
log.Info("seeking lockout", "myUrl", c.config.Url())
if c.sync.Synced() {
if c.sequencer.Synced() {
// Even if this errors we still internally marked ourselves as wanting the lockout
err := c.wantsLockoutUpdateWithMutex(ctx)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,11 @@ func (n *ExecutionNode) MessageIndexToBlockNumber(messageNum arbutil.MessageInde
func (n *ExecutionNode) Maintenance() error {
return n.ChainDB.Compact(nil, nil)
}

func (n *ExecutionNode) Synced() bool {
return n.SyncMonitor.Synced()
}

func (n *ExecutionNode) FullSyncProgressMap() map[string]interface{} {
return n.SyncMonitor.FullSyncProgressMap()
}
17 changes: 10 additions & 7 deletions execution/gethexec/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} {
}

func (s *SyncMonitor) SyncProgressMap() map[string]interface{} {
if s.consensus.Synced() {
built, err := s.exec.HeadMessageNumber()
consensusSyncTarget := s.consensus.SyncTargetMessageCount()
if err == nil && built+1 >= consensusSyncTarget {
return make(map[string]interface{})
}
if s.Synced() {
return make(map[string]interface{})
}
return s.FullSyncProgressMap()
}
Expand Down Expand Up @@ -112,7 +108,14 @@ func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error)
}

func (s *SyncMonitor) Synced() bool {
return len(s.SyncProgressMap()) == 0
if s.consensus.Synced() {
built, err := s.exec.HeadMessageNumber()
consensusSyncTarget := s.consensus.SyncTargetMessageCount()
if err == nil && built+1 >= consensusSyncTarget {
return true
}
}
return false
}

func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) {
Expand Down
2 changes: 2 additions & 0 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type ExecutionSequencer interface {
SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error
NextDelayedMessageNumber() (uint64, error)
MarkFeedStart(to arbutil.MessageIndex)
Synced() bool
FullSyncProgressMap() map[string]interface{}
}

type FullExecutionClient interface {
Expand Down

0 comments on commit 9085e2b

Please sign in to comment.