Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rawnode: expose per-follower MsgApp message stream #161

Closed
wants to merge 8 commits into from

Conversation

pav-kv
Copy link
Contributor

@pav-kv pav-kv commented Feb 8, 2024

Part of #130

Copy link

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some very basic questions, due to unfamiliarity with etcd/raft (and fading memory of the Raft thesis).

// entries with indices in (Match,Next) interval are already in flight.
//
// Invariant: 0 <= Match < Next.
Next uint64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the default initial index for a new raft group that is not forcing initialization via a snapshot equal to 1 (I realize CockroachDB forces snapshot initialization by using 10)? So Match would initialize to 0, given there is no log? And Next would be 1, so (0,1) (which is empty) is in flight?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sounds right. Need to double-check the invariants here hold at at all times.

In CRDB, these will be promptly promoted to (10,11) after the snapshot initialization.

@@ -218,6 +218,21 @@ type Config struct {
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// DisableEagerAppends makes raft hold off constructing log append messages in
// response to Step() calls. The messages can be collected via a separate
// MessagesTo method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very clear on usage, specifically compared to how we currently call RawNode.Ready() in CockroachDB, and then send the outbound messages in Ready.Messages.

If this is set to true, what will Ready.Messages contain? Is it only MsgStorageAppend and MsgStorageApply messages to the local store?

How will the user know to call MessagesTo -- presumably the user has to know that there is something available to send to a replica? Is the idea that the user will retrieve tracker.Progress and somehow know that there are entries available >= Progress.Next, and then decide to call MessagesTo? If it gets back (Progress.Next, j] and sends them, will it then call Progress.SentEntries(j) that will update Progress.Next to j+1? Isn't Progress just a struct, so how will this Progress state change go back into the RawNode?

Does etcd/raft assume that whatever has been retrieved in MessagesTo is the responsibility of the caller to reliably deliver, or does it itself retry? That is, who is responsible for ensuring that (Progress.Match,Progress.Next) is delivered? I am also murky on whether everything <= Progress.Match has indeed been delivered, because of the following comment in the CockroachDB code that says:

		// In state ProgressStateReplicate, the Match index is optimistically
		// updated whenever a message is *sent* (not received). Due to Raft
		// flow control, only a reasonably small amount of data can be en
		// route to a given follower at any point in time.

Copy link
Contributor Author

@pav-kv pav-kv Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready() currently returns all messages: this includes appends/snapshots to followers, appends / applies to local storage, heartbeats, etc.

With DisableEagerAppends it would still return all these messages, except MsgApp and snapshots. These would go through the MessagesTo interface. In CRDB, we would still need to call and act on Ready. But we would also call MessagesTo next to calling Ready (or may do so separately / on a different schedule, because it's not necessary to pair with Ready calls).

We would not use / update Progress structs, raft internally does so when it emits messages.

How will the user know to call MessagesTo -- presumably the user has to know that there is something available to send to a replica?

We also don't necessarily need to sneak peek into Progress. The MessageTo calls will return appends if they need to happen, and won't return anything if Progress indicates that the follower is up-to-date or throttled.

Does etcd/raft assume that whatever has been retrieved in MessagesTo is the responsibility of the caller to reliably deliver, or does it itself retry?

The expectation is that these messages are going to be sent. There is no reliability expectation, raft assumes that all messages can be dropped. Raft will retry things periodically, and if it detects that the appends message stream was broken, it will resend the portion of the log that was not acked.

Entries returned from MessagesTo will now be considered in-flight by raft.

because of the following comment in the CockroachDB code that says

The comment is inaccurate. It is the Next index that's optimistically updated when a message is sent. The Match index is a guaranteed durable match (because the follower replied with a MsgAppResp acking log indices up to Match).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we need to know to control this is the IDs of the followers, because MessagesTo takes the node ID of the recepient.

One way to know the IDs is iterating the Progress map, or looking at the current config. We need to be careful with config changes here though - I haven't considered the implications here yet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no reliability expectation, raft assumes that all messages can be dropped. Raft will retry things periodically, and if it detects that the appends message stream was broken, it will resend the portion of the log that was not acked.

So etcd/raft takes on the responsibility of resending (Match, Next)? With this inversion of control via the MessagesTo interface, will that retry also happen via this interface?

So something like the following where all messages are size 1:

  • Match=5
  • MessagesTo(MaxMsgAppBytes: 3) returns 6, 7, 8. Next=9
  • MessagesTo(MaxMsgAppBytes: 2) returns 9, 10. Next=11
  • Match=6
  • Raft decides to retry 7 onwards.
  • MessagesTo(MaxMsgAppBytes: 1) returns 7

Copy link
Contributor Author

@pav-kv pav-kv Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the retries happen like you described, transparently to the application-level caller. The application just needs to send the messages.

Best results are achieved if these messages are delivered in order though (that's also the case today), because each "optimistic" MsgApp builds on top of the previous ones. But if any message is dropped and the flow is "interrupted", raft will learn this (via MsgAppResp responses), rollback the Next and retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the user know to call MessagesTo

I'm also curious about this. Will this be hooked in to RawNode.HasReady? Will we introduce a new HasMessagesTo(FlowControl) API? What will indicate to CockroachDB that a replica needs to be scheduled on the raft scheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity: the supposed flow here is that the upper layer (e.g. CRDB) tracks all follower flows in StateReplicate which are "ready" to send entries. The condition for this is Next <= raftLog.lastIndex() && !Inflights.Full(). The tracking is "perfect", in that raft will expose all events that help tracking this info and keeping it consistent with its own Progress (e.g. going in/out of StateLeader and StateReplicate, config changes that remove a follower, etc).

Then, on Ready processing, the caller goes through all "ready" flows and calls this method to send messages. The "perfect" tracking helps avoiding wasted calls.

@@ -218,6 +218,21 @@ type Config struct {
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// DisableEagerAppends makes raft hold off constructing log append messages in
// response to Step() calls. The messages can be collected via a separate
// MessagesTo method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the user know to call MessagesTo

I'm also curious about this. Will this be hooked in to RawNode.HasReady? Will we introduce a new HasMessagesTo(FlowControl) API? What will indicate to CockroachDB that a replica needs to be scheduled on the raft scheduler?

@@ -372,6 +393,10 @@ type raft struct {
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message
// disableEagerAppends instructs append message construction and sending until
// the Ready() call. This improves batching and allows better resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until the Ready() call

Until the MessagesTo() call?

@@ -136,6 +136,29 @@ func (rn *RawNode) Ready() Ready {
return rd
}

// FlowControl tunes the volume and types of messages that GetMessages call can
// return to the application.
type FlowControl struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you explore pushing FlowControl as an interface into etcd/raft, similar to library's relationship with the Storage interface? I would imagine that this would allow us to hide the interactions with the optional flow control without needing to change the external interface of the library.

We probably still would want a DisableEagerAppends option so that the library could be configured to only consult FlowControl on calls to Ready()/HasReady(). These functions would then loop over the peers and construct messages for each according to the FlowControl state. Pushing this into etcd/raft would also facilitate fast-paths for replica states (e.g. followers) that cannot send MsgApps and need not consult FlowControl.

Copy link
Contributor Author

@pav-kv pav-kv Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both ways do extend the surface of the library, but interfacing FlowControl makes it indirect and implicit. I'm slightly in favour of avoiding this inversion of control, although I'm still exploring options.

Good point on the fast-path. It would be good to avoid something like a scan of all followers on every iteration.

Signed-off-by: Pavel Kalinnikov <[email protected]>
This commit adds a Progress.pendingCommit field tracking the highest
commit index <= Next-1 which the leader sent to the follower. It is used
to distinguish cases when a commit index update needs or doesn't need to
be sent to a follower.

Signed-off-by: Pavel Kalinnikov <[email protected]>
@pav-kv
Copy link
Contributor Author

pav-kv commented Jun 3, 2024

Superseded by cockroachdb/cockroach#125002 in CRDB.

@pav-kv pav-kv closed this Jun 3, 2024
@pav-kv pav-kv deleted the flow-control branch June 3, 2024 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants