Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete messages from coordinator after they become final #2471

Merged
merged 17 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 76 additions & 6 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -104,7 +105,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 24 * time.Hour,
SeqNumDuration: 10 * 24 * time.Hour,
UpdateInterval: 250 * time.Millisecond,
HandoffTimeout: 30 * time.Second,
SafeShutdownDelay: 5 * time.Second,
Expand Down Expand Up @@ -149,6 +150,7 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -338,6 +340,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
return nil
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
return c.signedBytesToMsgCount(ctx, []byte(resStr))
}

func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) {
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result()
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -473,6 +483,15 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could fail if parent chain doesn't support finalized..
For now, I think a good solution would be to have a boolean (default: true) option that enables deleting finalized messages

if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized == 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason you are checking for zero?
A little after initialization it is a valid return value and people will ask us if they get warnings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking for 0 because sequencer coordinator could be enabled without inboxreader/l1reader and l1reader is required to fetch finalizedMsgCount. In cases when inboxreader/l1reader is nil, sync monitor is made to return 0, nil this meant a reasonable return value to check if we should skip deleting of finalized messages

} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) {
Expand All @@ -492,6 +511,56 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
updateFinalizedMsgCount := func() error {
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if errors.Is(err, redis.Nil) {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
var keys []string
for msg := finalized - 1; msg > 0; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
diegoximenes marked this conversation as resolved.
Show resolved Hide resolved
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
}
return updateFinalizedMsgCount()
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client)
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := min(finalized, remoteMsgCount)
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized; msg < msgToDelete; msg++ {
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err)
}
return updateFinalizedMsgCount()
diegoximenes marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
if err != nil {
Expand Down Expand Up @@ -522,19 +591,20 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
diegoximenes marked this conversation as resolved.
Show resolved Hide resolved
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
readUntil := remoteMsgCount
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
for msgToRead < readUntil && localMsgCount > remoteFinalizedMsgCount {
diegoximenes marked this conversation as resolved.
Show resolved Hide resolved
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,94 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
}

}

func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coordConfig := TestSeqCoordinatorConfig
coordConfig.LockoutDuration = time.Millisecond * 100
coordConfig.LockoutSpare = time.Millisecond * 10
coordConfig.Signer.ECDSA.AcceptSequencer = false
coordConfig.Signer.SymmetricFallback = true
coordConfig.Signer.SymmetricSign = true
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""

nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

redisUrl := redisutil.CreateTestRedis(ctx, t)
coordConfig.RedisUrl = redisUrl

config := coordConfig
config.MyUrl = "test"
redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl)
Require(t, err)
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
config: config,
signer: nullSigner,
}

// Add messages to redis
var keys []string
msgBytes, err := coordinator.msgCountToSignedBytes(0)
Require(t, err)
for i := arbutil.MessageIndex(1); i <= 10; i++ {
err = coordinator.Client.Set(ctx, redisutil.MessageKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MessageSigKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
keys = append(keys, redisutil.MessageKeyFor(i), redisutil.MessageSigKeyFor(i))
}
// Set msgCount key
msgCountBytes, err := coordinator.msgCountToSignedBytes(11)
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MSG_COUNT_KEY, msgCountBytes, time.Hour).Err()
Require(t, err)
exists, err := coordinator.Client.Exists(ctx, keys...).Result()
Require(t, err)
if exists != 20 {
t.Fatal("couldn't find all messages and signatures in redis")
}

// Set finalizedMsgCount and delete finalized messages
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 5)
Require(t, err)

// Check if messages and signatures were deleted successfully
exists, err = coordinator.Client.Exists(ctx, keys[:8]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 1 to 4 were not deleted")
}

// Check if finalizedMsgCount was set to correct value
finalized, err := coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 5 {
t.Fatalf("incorrect finalizedMsgCount, want: 5, have: %d", finalized)
}

// Try deleting finalized messages when theres already a finalizedMsgCount
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7)
Require(t, err)
exists, err = coordinator.Client.Exists(ctx, keys[8:12]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 5 to 6 were not deleted")
}
finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 7 {
t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized)
}

// Check that non-finalized messages are still available in redis
exists, err = coordinator.Client.Exists(ctx, keys[12:]...).Result()
Require(t, err)
if exists != 8 {
t.Fatal("non-finalized messages and signatures in range 7 to 10 are not fully available")
}
}
7 changes: 7 additions & 0 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex {
return s.syncTarget
}

func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, nil
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ func mainImpl() int {
if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer {
log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer)
}
if nodeConfig.Node.SeqCoordinator.Enable && !nodeConfig.Node.ParentChainReader.Enable {
log.Error("Sequencer coordinator must be enabled with parent chain reader, try starting node with --parent-chain.connection.url")
return 1
}

var dataSigner signature.DataSignerFunc
var l1TransactionOptsValidator *bind.TransactOpts
Expand Down
13 changes: 7 additions & 6 deletions util/redisutil/redis_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/offchainlabs/nitro/arbutil"
)

const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only
const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key
const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key
const PRIORITIES_KEY string = "coordinator.priorities" // Read only
const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self
const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN
const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN
const WANTS_LOCKOUT_VAL string = "OK"
const INVALID_VAL string = "INVALID"
const INVALID_URL string = "<?INVALID-URL?>"
Expand Down
Loading