From 216766291b6b367d505a43f8ff5d72f7177617fb Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 26 Jun 2024 16:55:56 -0500 Subject: [PATCH 1/5] Sequencer shouldn't want lockout if local blockchain is lagging too much behind transaction streamer --- arbnode/seq_coordinator.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index ecf38ddf42..e6c259fd10 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -617,17 +617,18 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Warn("sequencer is not synced", detailsList...) } + processedMessages, err := c.streamer.GetProcessedMessageCount() + if err != nil { + log.Warn("coordinator: failed to read processed message count", "err", err) + processedMessages = 0 + } + // can take over as main sequencer? if synced && localMsgCount >= remoteMsgCount && chosenSeq == c.config.Url() { if c.sequencer == nil { log.Error("myurl main sequencer, but no sequencer exists") return c.noRedisError() } - processedMessages, err := c.streamer.GetProcessedMessageCount() - if err != nil { - log.Warn("coordinator: failed to read processed message count", "err", err) - processedMessages = 0 - } if processedMessages >= localMsgCount { // we're here because we don't currently hold the lock // sequencer is already either paused or forwarding @@ -663,8 +664,9 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } // update wanting the lockout + // Sequencer should want lockout if and only if- its synced, not avoiding lockout and processedMessages is not lagging too much behind localMsgCount var wantsLockoutErr error - if synced && !c.AvoidingLockout() { + if synced && !c.AvoidingLockout() && processedMessages+10 >= localMsgCount { wantsLockoutErr = c.wantsLockoutUpdate(ctx) } else { wantsLockoutErr = c.wantsLockoutRelease(ctx) From cfe6a8041e9ef88eafaba8a8406619feb500f926 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 27 Jun 2024 10:30:00 -0500 Subject: [PATCH 2/5] address PR comments --- arbnode/seq_coordinator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index e6c259fd10..2b8b09d576 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -607,9 +607,9 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { return c.noRedisError() } - syncProgress := c.sync.SyncProgressMap() - synced := len(syncProgress) == 0 + synced := c.sync.Synced() if !synced { + syncProgress := c.sync.FullSyncProgressMap() var detailsList []interface{} for key, value := range syncProgress { detailsList = append(detailsList, key, value) @@ -666,7 +666,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { // update wanting the lockout // Sequencer should want lockout if and only if- its synced, not avoiding lockout and processedMessages is not lagging too much behind localMsgCount var wantsLockoutErr error - if synced && !c.AvoidingLockout() && processedMessages+10 >= localMsgCount { + if synced && !c.AvoidingLockout() && processedMessages+1 >= c.sync.SyncTargetMessageCount() { wantsLockoutErr = c.wantsLockoutUpdate(ctx) } else { wantsLockoutErr = c.wantsLockoutRelease(ctx) From 7ddb65b7d0e8fb2f29d41f52ee0880907f17b3c7 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 27 Jun 2024 11:19:35 -0500 Subject: [PATCH 3/5] minor fix --- arbnode/seq_coordinator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 2b8b09d576..54eb873716 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -664,9 +664,9 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } // update wanting the lockout - // Sequencer should want lockout if and only if- its synced, not avoiding lockout and processedMessages is not lagging too much behind localMsgCount + // Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago var wantsLockoutErr error - if synced && !c.AvoidingLockout() && processedMessages+1 >= c.sync.SyncTargetMessageCount() { + if synced && !c.AvoidingLockout() && processedMessages >= c.sync.SyncTargetMessageCount() { wantsLockoutErr = c.wantsLockoutUpdate(ctx) } else { wantsLockoutErr = c.wantsLockoutRelease(ctx) From 59398ab93a1813ad47f636ea73f517b4bedfb977 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 27 Jun 2024 14:42:10 -0500 Subject: [PATCH 4/5] use gethexec syncmonitor in seq-coordinator to decide wanting lockout --- arbnode/inbox_test.go | 4 ++++ arbnode/seq_coordinator.go | 23 ++++++++++------------- execution/gethexec/node.go | 4 ++++ execution/interface.go | 2 +- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 252d7c9b7d..0a202a15f9 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -37,6 +37,10 @@ type execClientWrapper struct { func (w *execClientWrapper) Pause() { w.t.Error("not supported") } func (w *execClientWrapper) Activate() { w.t.Error("not supported") } func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil } +func (w *execClientWrapper) SyncProgressMap() map[string]interface{} { + w.t.Error("not supported") + return nil +} func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) { chainConfig := params.ArbitrumDevTestChainConfig() diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 54eb873716..ef7eed946a 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -39,7 +39,6 @@ type SeqCoordinator struct { redisutil.RedisCoordinator - sync *SyncMonitor streamer *TransactionStreamer sequencer execution.ExecutionSequencer delayedSequencer *DelayedSequencer @@ -150,7 +149,6 @@ func NewSeqCoordinator( } coordinator := &SeqCoordinator{ RedisCoordinator: *redisCoordinator, - sync: sync, streamer: streamer, sequencer: sequencer, config: config, @@ -607,9 +605,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { return c.noRedisError() } - synced := c.sync.Synced() + // Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago + syncProgress := c.sequencer.SyncProgressMap() + synced := len(syncProgress) == 0 if !synced { - syncProgress := c.sync.FullSyncProgressMap() var detailsList []interface{} for key, value := range syncProgress { detailsList = append(detailsList, key, value) @@ -617,18 +616,17 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Warn("sequencer is not synced", detailsList...) } - processedMessages, err := c.streamer.GetProcessedMessageCount() - if err != nil { - log.Warn("coordinator: failed to read processed message count", "err", err) - processedMessages = 0 - } - // can take over as main sequencer? if synced && localMsgCount >= remoteMsgCount && chosenSeq == c.config.Url() { if c.sequencer == nil { log.Error("myurl main sequencer, but no sequencer exists") return c.noRedisError() } + processedMessages, err := c.streamer.GetProcessedMessageCount() + if err != nil { + log.Warn("coordinator: failed to read processed message count", "err", err) + processedMessages = 0 + } if processedMessages >= localMsgCount { // we're here because we don't currently hold the lock // sequencer is already either paused or forwarding @@ -664,9 +662,8 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } // update wanting the lockout - // Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago var wantsLockoutErr error - if synced && !c.AvoidingLockout() && processedMessages >= c.sync.SyncTargetMessageCount() { + if synced && !c.AvoidingLockout() { wantsLockoutErr = c.wantsLockoutUpdate(ctx) } else { wantsLockoutErr = c.wantsLockoutRelease(ctx) @@ -851,7 +848,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) { defer c.wantsLockoutMutex.Unlock() c.avoidLockout-- log.Info("seeking lockout", "myUrl", c.config.Url()) - if c.sync.Synced() { + if len(c.sequencer.SyncProgressMap()) == 0 { // Even if this errors we still internally marked ourselves as wanting the lockout err := c.wantsLockoutUpdateWithMutex(ctx) if err != nil { diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index cb2bfe12e8..5fe100970a 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -417,3 +417,7 @@ func (n *ExecutionNode) MessageIndexToBlockNumber(messageNum arbutil.MessageInde func (n *ExecutionNode) Maintenance() error { return n.ChainDB.Compact(nil, nil) } + +func (n *ExecutionNode) SyncProgressMap() map[string]interface{} { + return n.SyncMonitor.SyncProgressMap() +} diff --git a/execution/interface.go b/execution/interface.go index 32ec7dd0f7..7c4efbc2dc 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -57,10 +57,10 @@ type ExecutionSequencer interface { SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) MarkFeedStart(to arbutil.MessageIndex) + SyncProgressMap() map[string]interface{} } type FullExecutionClient interface { - ExecutionClient ExecutionRecorder ExecutionSequencer From 0ae39c8c154d8180e337cc3968b5d4bbbe4d7729 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Fri, 28 Jun 2024 10:47:05 -0500 Subject: [PATCH 5/5] refactor gethexec sync monitor and update ExecutionSequencer interface --- arbnode/inbox_test.go | 3 ++- arbnode/seq_coordinator.go | 6 +++--- execution/gethexec/node.go | 8 ++++++-- execution/gethexec/sync_monitor.go | 17 ++++++++++------- execution/interface.go | 4 +++- 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 0a202a15f9..ef4acd038c 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -37,7 +37,8 @@ type execClientWrapper struct { func (w *execClientWrapper) Pause() { w.t.Error("not supported") } func (w *execClientWrapper) Activate() { w.t.Error("not supported") } func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil } -func (w *execClientWrapper) SyncProgressMap() map[string]interface{} { +func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false } +func (w *execClientWrapper) FullSyncProgressMap() map[string]interface{} { w.t.Error("not supported") return nil } diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index ef7eed946a..cdf1011b11 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -606,9 +606,9 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } // Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago - syncProgress := c.sequencer.SyncProgressMap() - synced := len(syncProgress) == 0 + synced := c.sequencer.Synced() if !synced { + syncProgress := c.sequencer.FullSyncProgressMap() var detailsList []interface{} for key, value := range syncProgress { detailsList = append(detailsList, key, value) @@ -848,7 +848,7 @@ func (c *SeqCoordinator) SeekLockout(ctx context.Context) { defer c.wantsLockoutMutex.Unlock() c.avoidLockout-- log.Info("seeking lockout", "myUrl", c.config.Url()) - if len(c.sequencer.SyncProgressMap()) == 0 { + if c.sequencer.Synced() { // Even if this errors we still internally marked ourselves as wanting the lockout err := c.wantsLockoutUpdateWithMutex(ctx) if err != nil { diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 5fe100970a..8ee16095d9 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -418,6 +418,10 @@ func (n *ExecutionNode) Maintenance() error { return n.ChainDB.Compact(nil, nil) } -func (n *ExecutionNode) SyncProgressMap() map[string]interface{} { - return n.SyncMonitor.SyncProgressMap() +func (n *ExecutionNode) Synced() bool { + return n.SyncMonitor.Synced() +} + +func (n *ExecutionNode) FullSyncProgressMap() map[string]interface{} { + return n.SyncMonitor.FullSyncProgressMap() } diff --git a/execution/gethexec/sync_monitor.go b/execution/gethexec/sync_monitor.go index 564c6d74bd..86949c7767 100644 --- a/execution/gethexec/sync_monitor.go +++ b/execution/gethexec/sync_monitor.go @@ -59,12 +59,8 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} { } func (s *SyncMonitor) SyncProgressMap() map[string]interface{} { - if s.consensus.Synced() { - built, err := s.exec.HeadMessageNumber() - consensusSyncTarget := s.consensus.SyncTargetMessageCount() - if err == nil && built+1 >= consensusSyncTarget { - return make(map[string]interface{}) - } + if s.Synced() { + return make(map[string]interface{}) } return s.FullSyncProgressMap() } @@ -112,7 +108,14 @@ func (s *SyncMonitor) FinalizedBlockNumber(ctx context.Context) (uint64, error) } func (s *SyncMonitor) Synced() bool { - return len(s.SyncProgressMap()) == 0 + if s.consensus.Synced() { + built, err := s.exec.HeadMessageNumber() + consensusSyncTarget := s.consensus.SyncTargetMessageCount() + if err == nil && built+1 >= consensusSyncTarget { + return true + } + } + return false } func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) { diff --git a/execution/interface.go b/execution/interface.go index 7c4efbc2dc..ddf30b4b2a 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -57,10 +57,12 @@ type ExecutionSequencer interface { SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) MarkFeedStart(to arbutil.MessageIndex) - SyncProgressMap() map[string]interface{} + Synced() bool + FullSyncProgressMap() map[string]interface{} } type FullExecutionClient interface { + ExecutionClient ExecutionRecorder ExecutionSequencer