Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete messages from coordinator after they become final #2471

Merged
merged 17 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -104,7 +105,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 24 * time.Hour,
SeqNumDuration: 10 * 24 * time.Hour,
UpdateInterval: 250 * time.Millisecond,
HandoffTimeout: 30 * time.Second,
SafeShutdownDelay: 5 * time.Second,
Expand Down Expand Up @@ -149,6 +150,7 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -338,6 +340,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
return nil
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
return c.signedBytesToMsgCount(ctx, []byte(resStr))
}

func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) {
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result()
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -473,6 +483,10 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) {
Expand All @@ -492,6 +506,64 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error {
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil || finalized == 0 {
return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
updateFinalizedMsgCount := func() error {
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
if errors.Is(err, redis.Nil) {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
var keys []string
for msg := finalized; ; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
}
return updateFinalizedMsgCount()
}
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := finalized
if msgToDelete > remoteMsgCount {
msgToDelete = remoteMsgCount
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized + 1; msg <= msgToDelete; msg++ {
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err)
}
return updateFinalizedMsgCount()
}
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
if err != nil {
Expand Down Expand Up @@ -522,6 +594,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
diegoximenes marked this conversation as resolved.
Show resolved Hide resolved
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
Expand All @@ -534,7 +610,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
Expand Down
8 changes: 8 additions & 0 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arbnode

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -72,6 +73,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex {
return s.syncTarget
}

func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url")
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions util/redisutil/redis_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/offchainlabs/nitro/arbutil"
)

const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const WANTS_LOCKOUT_VAL string = "OK"
const INVALID_VAL string = "INVALID"
const INVALID_URL string = "<?INVALID-URL?>"
Expand Down
Loading