diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index dad271c168..e3eb9f1df1 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -70,9 +70,10 @@ type SeqCoordinatorConfig struct { SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"` ReleaseRetries int `koanf:"release-retries"` // Max message per poll. - MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"` - MyUrl string `koanf:"my-url"` - Signer signature.SignVerifyConfig `koanf:"signer"` + MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"` + MyUrl string `koanf:"my-url"` + DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"` + Signer signature.SignVerifyConfig `koanf:"signer"` } func (c *SeqCoordinatorConfig) Url() string { @@ -96,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown") f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind") f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen") + f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis") signature.SignVerifyConfigAddOptions(prefix+".signer", f) } @@ -113,23 +115,25 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ RetryInterval: 50 * time.Millisecond, MsgPerPoll: 2000, MyUrl: redisutil.INVALID_URL, + DeleteFinalizedMsgs: true, Signer: signature.DefaultSignVerifyConfig, } var TestSeqCoordinatorConfig = SeqCoordinatorConfig{ - Enable: false, - RedisUrl: "", - LockoutDuration: time.Second * 2, - LockoutSpare: time.Millisecond * 10, - SeqNumDuration: time.Minute * 10, - UpdateInterval: time.Millisecond * 10, - HandoffTimeout: time.Millisecond * 200, - SafeShutdownDelay: time.Millisecond * 100, - ReleaseRetries: 4, - RetryInterval: time.Millisecond * 3, - MsgPerPoll: 20, - MyUrl: redisutil.INVALID_URL, - Signer: signature.DefaultSignVerifyConfig, + Enable: false, + RedisUrl: "", + LockoutDuration: time.Second * 2, + LockoutSpare: time.Millisecond * 10, + SeqNumDuration: time.Minute * 10, + UpdateInterval: time.Millisecond * 10, + HandoffTimeout: time.Millisecond * 200, + SafeShutdownDelay: time.Millisecond * 100, + ReleaseRetries: 4, + RetryInterval: time.Millisecond * 3, + MsgPerPoll: 20, + MyUrl: redisutil.INVALID_URL, + DeleteFinalizedMsgs: true, + Signer: signature.DefaultSignVerifyConfig, } func NewSeqCoordinator( @@ -483,14 +487,16 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } // Was, and still is, the active sequencer - // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key - finalized, err := c.sync.GetFinalizedMsgCount(ctx) - if err != nil { - log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) - } else if finalized == 0 { - log.Warn("SyncMonitor returned zero finalizedMessageCount") - } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { - log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + if c.config.DeleteFinalizedMsgs { + // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key + finalized, err := c.sync.GetFinalizedMsgCount(ctx) + if err != nil { + log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) + } else if finalized == 0 { + log.Warn("SyncMonitor returned zero finalizedMessageCount") + } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { + log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + } } // We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5) @@ -514,8 +520,14 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error { deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error { if len(keys) > 0 { - if err := c.Client.Del(ctx, keys...).Err(); err != nil { - return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err) + // To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations + // next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower. + // In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized + batchDeleteCount := 1000 + for i := len(keys); i > 0; i -= batchDeleteCount { + if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil { + return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err) + } } } finalizedBytes, err := c.msgCountToSignedBytes(finalized) @@ -593,7 +605,11 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) if err != nil { - log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) + loglevel := log.Error + if err == redis.Nil { + loglevel = log.Debug + } + loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) } remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil {