Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Aug 8, 2024
1 parent 52bba36 commit 4848a05
Showing 1 changed file with 43 additions and 27 deletions.
70 changes: 43 additions & 27 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ type SeqCoordinatorConfig struct {
SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"`
ReleaseRetries int `koanf:"release-retries"`
// Max message per poll.
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
Signer signature.SignVerifyConfig `koanf:"signer"`
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"`
Signer signature.SignVerifyConfig `koanf:"signer"`
}

func (c *SeqCoordinatorConfig) Url() string {
Expand All @@ -96,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown")
f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind")
f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen")
f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis")
signature.SignVerifyConfigAddOptions(prefix+".signer", f)
}

Expand All @@ -113,23 +115,25 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RetryInterval: 50 * time.Millisecond,
MsgPerPoll: 2000,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

var TestSeqCoordinatorConfig = SeqCoordinatorConfig{
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
Signer: signature.DefaultSignVerifyConfig,
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

func NewSeqCoordinator(
Expand Down Expand Up @@ -483,14 +487,16 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized == 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
if c.config.DeleteFinalizedMsgs {
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized == 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
Expand All @@ -514,8 +520,14 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error {
if len(keys) > 0 {
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err)
// To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations
// next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower.
// In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized
batchDeleteCount := 1000
for i := len(keys); i > 0; i -= batchDeleteCount {
if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil {
return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err)
}
}
}
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
Expand Down Expand Up @@ -593,7 +605,11 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
loglevel := log.Error
if err == redis.Nil {
loglevel = log.Debug
}
loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
Expand Down

0 comments on commit 4848a05

Please sign in to comment.