Skip to content

Commit

Permalink
Delete messages from coordinator after they become final
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Jul 10, 2024
1 parent de3dcff commit 9745003
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
80 changes: 78 additions & 2 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -104,7 +105,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 24 * time.Hour,
SeqNumDuration: 10 * 24 * time.Hour,
UpdateInterval: 250 * time.Millisecond,
HandoffTimeout: 30 * time.Second,
SafeShutdownDelay: 5 * time.Second,
Expand Down Expand Up @@ -149,6 +150,7 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -338,6 +340,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
return nil
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
return c.signedBytesToMsgCount(ctx, []byte(resStr))
}

func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) {
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result()
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -473,6 +483,10 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) {
Expand All @@ -492,6 +506,64 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error {
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil || finalized == 0 {
return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err)
}
updateFinalizedMsgCount := func() error {
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized; ; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
}
return updateFinalizedMsgCount()
}
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := finalized
if msgToDelete > remoteMsgCount {
msgToDelete = remoteMsgCount
}
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized + 1; msg <= msgToDelete; msg++ {
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err)
}
return updateFinalizedMsgCount()
}
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
if err != nil {
Expand Down Expand Up @@ -522,6 +594,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
Expand All @@ -534,7 +610,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
Expand Down
8 changes: 8 additions & 0 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arbnode

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -72,6 +73,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex {
return s.syncTarget
}

func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url")
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions util/redisutil/redis_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/offchainlabs/nitro/arbutil"
)

const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const WANTS_LOCKOUT_VAL string = "OK"
const INVALID_VAL string = "INVALID"
const INVALID_URL string = "<?INVALID-URL?>"
Expand Down

0 comments on commit 9745003

Please sign in to comment.