-
Notifications
You must be signed in to change notification settings - Fork 473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delete messages from coordinator after they become final #2471
Changes from 6 commits
9745003
3dd19d9
ab8e6a8
4cff8b3
2bea7be
e6cd9a4
08c7e9e
99e6032
379bf7e
ae1dbec
52bba36
4848a05
d8b056c
45eaee9
71c7324
ae4cc86
bccdbe9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ type SeqCoordinator struct { | |
|
||
redisutil.RedisCoordinator | ||
|
||
sync *SyncMonitor | ||
streamer *TransactionStreamer | ||
sequencer execution.ExecutionSequencer | ||
delayedSequencer *DelayedSequencer | ||
|
@@ -104,7 +105,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ | |
RedisUrl: "", | ||
LockoutDuration: time.Minute, | ||
LockoutSpare: 30 * time.Second, | ||
SeqNumDuration: 24 * time.Hour, | ||
SeqNumDuration: 10 * 24 * time.Hour, | ||
UpdateInterval: 250 * time.Millisecond, | ||
HandoffTimeout: 30 * time.Second, | ||
SafeShutdownDelay: 5 * time.Second, | ||
|
@@ -149,6 +150,7 @@ func NewSeqCoordinator( | |
} | ||
coordinator := &SeqCoordinator{ | ||
RedisCoordinator: *redisCoordinator, | ||
sync: sync, | ||
streamer: streamer, | ||
sequencer: sequencer, | ||
config: config, | ||
|
@@ -338,6 +340,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC | |
return nil | ||
} | ||
|
||
func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { | ||
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result() | ||
if err != nil { | ||
return 0, err | ||
} | ||
return c.signedBytesToMsgCount(ctx, []byte(resStr)) | ||
} | ||
|
||
func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) { | ||
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result() | ||
if errors.Is(err, redis.Nil) { | ||
|
@@ -473,6 +483,15 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin | |
return c.noRedisError() | ||
} | ||
// Was, and still is, the active sequencer | ||
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key | ||
finalized, err := c.sync.GetFinalizedMsgCount(ctx) | ||
if err != nil { | ||
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) | ||
} else if finalized == 0 { | ||
log.Warn("SyncMonitor returned zero finalizedMessageCount") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a specific reason you are checking for zero? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was checking for 0 because sequencer coordinator could be enabled without inboxreader/l1reader and l1reader is required to fetch finalizedMsgCount. In cases when inboxreader/l1reader is nil, sync monitor is made to return |
||
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { | ||
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) | ||
} | ||
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. | ||
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5) | ||
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) { | ||
|
@@ -492,6 +511,56 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin | |
return c.noRedisError() | ||
} | ||
|
||
func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error { | ||
updateFinalizedMsgCount := func() error { | ||
finalizedBytes, err := c.msgCountToSignedBytes(finalized) | ||
if err != nil { | ||
return err | ||
} | ||
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil { | ||
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err) | ||
} | ||
return nil | ||
} | ||
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) | ||
if errors.Is(err, redis.Nil) { | ||
ganeshvanahalli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var keys []string | ||
for msg := finalized - 1; msg > 0; msg-- { | ||
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() | ||
if exists == 0 || err != nil { | ||
diegoximenes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break | ||
} | ||
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) | ||
} | ||
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another | ||
if len(keys) > 0 { | ||
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) | ||
if err := c.Client.Del(ctx, keys...).Err(); err != nil { | ||
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err) | ||
} | ||
} | ||
return updateFinalizedMsgCount() | ||
} else if err != nil { | ||
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) | ||
} | ||
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client) | ||
if err != nil { | ||
return fmt.Errorf("cannot get remote message count: %w", err) | ||
} | ||
msgToDelete := min(finalized, remoteMsgCount) | ||
if prevFinalized < msgToDelete { | ||
var keys []string | ||
for msg := prevFinalized; msg < msgToDelete; msg++ { | ||
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) | ||
} | ||
if err := c.Client.Del(ctx, keys...).Err(); err != nil { | ||
return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err) | ||
} | ||
return updateFinalizedMsgCount() | ||
diegoximenes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return nil | ||
} | ||
|
||
func (c *SeqCoordinator) update(ctx context.Context) time.Duration { | ||
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx) | ||
if err != nil { | ||
|
@@ -522,19 +591,20 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { | |
log.Error("cannot read message count", "err", err) | ||
return c.config.UpdateInterval | ||
} | ||
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) | ||
if err != nil { | ||
log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) | ||
diegoximenes marked this conversation as resolved.
Show resolved
Hide resolved
ganeshvanahalli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
remoteMsgCount, err := c.GetRemoteMsgCount() | ||
if err != nil { | ||
log.Warn("cannot get remote message count", "err", err) | ||
return c.retryAfterRedisError() | ||
} | ||
readUntil := remoteMsgCount | ||
if readUntil > localMsgCount+c.config.MsgPerPoll { | ||
readUntil = localMsgCount + c.config.MsgPerPoll | ||
} | ||
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount) | ||
var messages []arbostypes.MessageWithMetadata | ||
msgToRead := localMsgCount | ||
var msgReadErr error | ||
for msgToRead < readUntil { | ||
for msgToRead < readUntil && localMsgCount > remoteFinalizedMsgCount { | ||
diegoximenes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var resString string | ||
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() | ||
if msgReadErr != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could fail if parent chain doesn't support finalized..
For now, I think a good solution would be to have a boolean (default: true) option that enables deleting finalized messages