diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cdf1011b11..64b1ef9b81 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -39,6 +39,7 @@ type SeqCoordinator struct { redisutil.RedisCoordinator + sync *SyncMonitor streamer *TransactionStreamer sequencer execution.ExecutionSequencer delayedSequencer *DelayedSequencer @@ -104,7 +105,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ RedisUrl: "", LockoutDuration: time.Minute, LockoutSpare: 30 * time.Second, - SeqNumDuration: 24 * time.Hour, + SeqNumDuration: 10 * 24 * time.Hour, UpdateInterval: 250 * time.Millisecond, HandoffTimeout: 30 * time.Second, SafeShutdownDelay: 5 * time.Second, @@ -149,6 +150,7 @@ func NewSeqCoordinator( } coordinator := &SeqCoordinator{ RedisCoordinator: *redisCoordinator, + sync: sync, streamer: streamer, sequencer: sequencer, config: config, @@ -338,6 +340,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC return nil } +func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result() + if err != nil { + return 0, err + } + return c.signedBytesToMsgCount(ctx, []byte(resStr)) +} + func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) { resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result() if errors.Is(err, redis.Nil) { @@ -473,6 +483,10 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } // Was, and still is, the active sequencer + // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key + if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil { + log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + } // We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5) if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) { @@ -492,6 +506,64 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } +func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error { + finalized, err := c.sync.GetFinalizedMsgCount(ctx) + if err != nil || finalized == 0 { + return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err) + } + updateFinalizedMsgCount := func() error { + finalizedBytes, err := c.msgCountToSignedBytes(finalized) + if err != nil { + return err + } + if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil { + return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err) + } + return nil + } + prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) + if err != nil { + if errors.Is(err, redis.Nil) { + var keys []string + for msg := finalized; ; msg-- { + exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() + if exists == 0 || err != nil { + break + } + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another + if len(keys) > 0 { + log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) + if err := c.Client.Del(ctx, keys...).Err(); err != nil { + return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err) + } + } + return updateFinalizedMsgCount() + } + return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) + } + remoteMsgCount, err := c.GetRemoteMsgCount() + if err != nil { + return fmt.Errorf("cannot get remote message count: %w", err) + } + msgToDelete := finalized + if msgToDelete > remoteMsgCount { + msgToDelete = remoteMsgCount + } + if prevFinalized < msgToDelete { + var keys []string + for msg := prevFinalized + 1; msg <= msgToDelete; msg++ { + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + if err := c.Client.Del(ctx, keys...).Err(); err != nil { + return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err) + } + return updateFinalizedMsgCount() + } + return nil +} + func (c *SeqCoordinator) update(ctx context.Context) time.Duration { chosenSeq, err := c.RecommendSequencerWantingLockout(ctx) if err != nil { @@ -522,6 +594,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Error("cannot read message count", "err", err) return c.config.UpdateInterval } + remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) + if err != nil { + log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) + } remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil { log.Warn("cannot get remote message count", "err", err) @@ -534,7 +610,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error - for msgToRead < readUntil { + for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount { var resString string resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() if msgReadErr != nil { diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index d3b9a7e1c6..27da6b7331 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -2,6 +2,7 @@ package arbnode import ( "context" + "errors" "sync" "time" @@ -72,6 +73,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex { return s.syncTarget } +func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + if s.inboxReader != nil && s.inboxReader.l1Reader != nil { + return s.inboxReader.GetFinalizedMsgCount(ctx) + } + return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url") +} + func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { msgCount, err := s.txStreamer.GetMessageCount() if err != nil { diff --git a/util/redisutil/redis_coordinator.go b/util/redisutil/redis_coordinator.go index 59e3b0e0f9..2c12ffec50 100644 --- a/util/redisutil/redis_coordinator.go +++ b/util/redisutil/redis_coordinator.go @@ -13,12 +13,13 @@ import ( "github.com/offchainlabs/nitro/arbutil" ) -const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only -const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key -const PRIORITIES_KEY string = "coordinator.priorities" // Read only -const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self -const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN -const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN +const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only +const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key +const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key +const PRIORITIES_KEY string = "coordinator.priorities" // Read only +const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self +const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN +const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN const WANTS_LOCKOUT_VAL string = "OK" const INVALID_VAL string = "INVALID" const INVALID_URL string = ""