Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: run 2k concurrent transfers with 1MB of data #81

Merged
merged 1 commit into from
Sep 4, 2023

Conversation

carver
Copy link
Contributor

@carver carver commented Jun 16, 2023

This used to be a PR that grouped up all the changes needed to pass the high-concurrency tests. Now, with all the changes merged, the test is the only commit left, and it passes against master.


Previous writeup, for posterity

Handle High Concurrency

The minimal functional changes required to pass the high-volume socket test, plus some logs, pulled from #69

The commits should be fairly clean to inspect individually, if you're into that kind of thing.

The functional changes are:

  • Bias the connection event loop, to prefer handling inbound packets. Previously, it was possible to randomly choose to timeout a connection, even if an inbound packet was waiting on the queue.
  • Use exponential backoff when sending SYN times out (the test tried a bunch of SYNs at once, exponential backoff helps recover from this stressed condition)
  • A bit more wiggle room in the default config (longer idle timeout, more connection retries, more DATA packet retries)
  • Bias the socket event loop, to prefer handling socket reads (usually UDP). UDP packet drops were happening a lot on high-volume tests. It is much more work to recover from drops, than from delays to outbound sending (a side effect of preferring reads), and uTP already nicely handles delays.
  • Finally, add a limit to the number of connections. This is applied on new outbound connections, but considers the total connection count.

Altogether, these changes reliably cause the new test to pass, even when bumped up to 10,000 simultaneously-launched transfers. (Though the simultaneous bit is mostly a red-herring now, since the stream prevents them from actually starting to run at the same time)

There was a probabilistic success improvement for most of these changes, though the connection limiting was the final piece that gave apparent reliability. It might be tempting to drop some of these changes and just stick to the connection limit. However, since the connection limit is a configurable value, we want to support high connection rates as much as possible. So it seems worth keeping the earlier functional changes, to do a better job if users try high values.

Notably not present is the "stress tracking" or "busy/idle tracking" from the socket. Both provided interesting information, but were not reliable enough to use to apply back-pressure. The added code complexity wasn't worth the (admittedly quite valuable) logs, IMO.

The logging changes are mostly not visible except in stress/failure situations, but they are very helpful for debugging, so I think we should keep them.

@carver carver requested a review from jacobkaufmann June 16, 2023 23:35
@carver carver marked this pull request as ready for review June 16, 2023 23:38
src/socket.rs Outdated
Comment on lines 258 to 262
let num_connections = self.conns.read().unwrap().len();
if num_connections < self.max_conns {
break;
}
self.outbound_dropped.notified().await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this holding the read() lock across the await? Do we need something like:

Suggested change
let num_connections = self.conns.read().unwrap().len();
if num_connections < self.max_conns {
break;
}
self.outbound_dropped.notified().await;
let num_connections = {
self.conns.read().unwrap().len()
};
if num_connections < self.max_conns {
break;
}
self.outbound_dropped.notified().await;

It hasn't seemed to cause problems during testing, but it's obviously worth double-checking.

src/socket.rs Outdated
}

impl UtpSocket<SocketAddr> {
pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
pub async fn bind(addr: SocketAddr, max_conns: usize) -> io::Result<Self> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring with some hints about how to choose max_conns. Roughly:

  • 100 seems like a reasonable starting place for a lot of use cases, 1k is probably a bit too high, depending on your setup, 10k will likely cause overload, 1-5 is probably so small that you won't be getting the full potential of the library
  • A too-high value can cause connection timeouts, and various logs about packets taking too long to be processed locally
  • A too-low value will cause connections to delay for a long time, while waiting for connect() or connect_with_cid() to return. You will not experience the full performance of the library

src/conn.rs Outdated
max_idle_timeout,
max_packet_size: congestion::DEFAULT_MAX_PACKET_SIZE_BYTES as u16,
initial_timeout: congestion::DEFAULT_INITIAL_TIMEOUT,
min_timeout: congestion::DEFAULT_MIN_TIMEOUT,
max_timeout: max_idle_timeout,
max_timeout: max_idle_timeout / 4,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the 4 to a file constant, and explain the reasoning

Copy link
Collaborator

@jacobkaufmann jacobkaufmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work! left some comments on my first pass.

it might be easier to break these up into smaller PRs, since it will become difficult to keep the changes for PR back-and-forth organized, and I'd prefer to squash all commits regardless

I would see a minimum of 3 sets of changes:

  • concurrent connection limits
  • tokio::select biasing
  • exponential backoff for outgoing connection establishment

src/conn.rs Outdated
@@ -249,9 +249,16 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
tokio::pin!(idle_timeout);
loop {
tokio::select! {
biased;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of biased, could we call try_recv in the idle timeout branch, only registering the idle timeout if that channel is empty?

if we keep the biased, do we need to move any of the other branches (e.g. shutdown)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's an interesting approach that hadn't occurred to me.

The tradeoffs I'm seeing on that choice are:

Pros of switching to try_recv

  • Fewer opportunities to mess up the branch order in subtle ways
  • biased is a less-familiar approach, it requires more understanding from the contributors

Cons of switching to try_recv

  • Slight performance loss (biased is a bit faster)
  • In maintenance, we'd want to make sure that any path that resets idle_timeout is also double-checked in the timeout branch. For example here, I guess we'd also want to check the writes.try_recv()
  • More lines of code: the timeout processing gets messier. Do we just restart the loop if there is a packet waiting on the queue (in which case we might loop through a random number of times before hitting the branch we want), or we could actually process the packet in the idle-timeout branch, which seems surprising and leaves room open for bugs where someone makes a change in the packet-processing branch that doesn't get copied to the timeout branch.

Based on these tradeoffs, I'm inclined to keep the biased approach.

do we need to move any of the other branches (e.g. shutdown)?

I haven't found any other important races, when both branches are notified, and we need to process one before the other. We might get some performance benefit of processing writes before reads, or the other way around, but I can't see any place where it would cause a problem. Eventually, one branch will always drain out and we'll move onto the next one. It doesn't seem we'll get stuck in any particular branch.

So I generally favored changing the order as little as possible. What's your thinking on moving the shutdown branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential benefit of keeping biased is using it to do something like: process all incoming packets before sending the SYN that ACKs the very latest one.

src/conn.rs Outdated
Comment on lines 256 to 291
let queue_time = Instant::now() - receive_time;
if queue_time > self.config.target_delay {
tracing::warn!(?queue_time, "incoming packet queued for way too long");
} else if queue_time > Duration::from_millis(40) {
tracing::debug!(?queue_time, "incoming packet queued for too long");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were these logs meaningful? I am more inclined to make this sort of info available via metrics, since we do not have to make some judgement about what is "too long".

src/conn.rs Outdated
@@ -285,10 +286,31 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
_ = self.writable.notified() => {
self.process_writes(Instant::now());
}
Some(Ok(timeout)) = self.unacked.next() => {
let (_seq, packet) = timeout;
//tracing::debug!(seq, ack = %packet.ack_num(), packet = ?packet.packet_type(), "timeout");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we left this commented out by mistake. was the intention to remove the log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, it was just very noisy during testing. I'll add it back.

src/conn.rs Outdated
Comment on lines 298 to 313
match self.state {
State::Closing { local_fin, remote_fin, .. } => {
if unacked.len() == 1 && local_fin.is_some() && &local_fin.unwrap() == unacked.last().unwrap() {
tracing::debug!(?unacked, ?local_fin, ?remote_fin, "Idle timeout with both FINs sent, with only FIN unacked.");
} else {
tracing::warn!(?unacked, ?local_fin, ?remote_fin, "idle timeout expired while closing...");
}
}
State::Established { .. } => {
tracing::warn!(?unacked, "idle timeout expired in established connection, closing...");
}
State::Connecting { .. } => {
tracing::warn!(?unacked, "idle timeout expired while connecting, closing...");
}
State::Closed { .. } => unreachable!("In an if block that excludes the closed state"),
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of all these different match arms, it would be nicer to have a compact representation of the state that we can emit alongside the idle timeout

src/conn.rs Outdated
Comment on lines 766 to 809
let send_result = self.socket_events
.send(SocketEvent::Outgoing((state, self.cid.peer.clone())));
if send_result.is_err() {
tracing::warn!("Cannot send STATE packet: socket_events closed");
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let send_result = self.socket_events
.send(SocketEvent::Outgoing((state, self.cid.peer.clone())));
if send_result.is_err() {
tracing::warn!("Cannot send STATE packet: socket_events closed");
return;
}
if let Err(err) = self.socket_events.send(SocketEvent::Outgoing((state, self.cid.peer.clone()))) {
tracing::warn!("cannot send STATE packet: socket closed channel");
return;
}

src/conn.rs Outdated
Comment on lines 1136 to 1174
let send_result = socket_events.send(SocketEvent::Outgoing((packet, dest.clone())));
if send_result.is_err() {
tracing::error!("Cannot transmit packet: socket_events closed");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let send_result = socket_events.send(SocketEvent::Outgoing((packet, dest.clone())));
if send_result.is_err() {
tracing::error!("Cannot transmit packet: socket_events closed");
}
if let Err(err) = socket_events.send(SocketEvent::Outgoing((packet, dest.clone()))) {
tracing::error!("cannot transmit packet: socket closed channel");
}

this log level is error while similar log above is warn. we should probably make both error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻 to unifying the log level. It's fairly obnoxious to have a passing test end with an error log, and it happens pretty regularly, so I'd like to make them both warn.

src/socket.rs Outdated
}

impl UtpSocket<SocketAddr> {
pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
pub async fn bind(addr: SocketAddr, max_conns: usize) -> io::Result<Self> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer this go in a socket-level config since we want to move in that direction anyway #38

src/socket.rs Outdated
Comment on lines 31 to 33
/// Maximum allowed connections, which only restricts outbound connections, but counts all
/// active connections.
max_conns: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the rationale on restricting only outbound connections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this one is a little fuzzy. I suppose that:

  • outgoing connections feel delayable until we want to send the data, but inbound connections were presumably initiated by someone else, so we can't just wait as long as we want to accept
  • we might have a preference to receive data (help selves) over sending it (help others), if we are resource constrained

Any thoughts on this? I suppose the alternative would be to add the same pausing mechanism into the accepts.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the first point is more important than the second.

if we delay accepting a connection, then the initiating endpoint may give up on the connection before we accept, and then when we accept the connection we will consume a connection until the idle timeout.

uTP streams are bidirectional, and the accepting endpoint of a uTP stream is not necessarily the endpoint receiving data, so I don't think we can make assumptions about one endpoint being the sender and the other being the receiver.

that said, I think it's okay to only limit outgoing connections if we make it clear from the config that it is not total connections. if it becomes evident that an unlimited number of incoming (i.e. accepted) connections are harming outgoing connections, then we can address it with a different approach.

@carver
Copy link
Contributor Author

carver commented Aug 9, 2023

Test error discussed in #103

@carver carver force-pushed the handle-high-concurrency branch 3 times, most recently from 1a1b0c3 to ea06fb9 Compare September 2, 2023 00:38
Now we can reliably handle the launching of many more streams, so let's
really stress-test the library.

This has helped us identify and resolve a number of bugs, in local
testing. Now that they seem to be fixed, we can merge to master to
prevent regressions.

Bonus cleanups on @njgheorghita 's original change:
- Change the high end of the range to equal the number of concurrent
  streams
- Log showing the transfer speed in socket test
- Run an optimized build of utp when testing, for more performance (and
  interesting results) in the high-concurrency tests
- Return from the test as soon as an error is found, cancelling all
  in-flight transfers
@carver carver force-pushed the handle-high-concurrency branch from 6e041ae to daaeb19 Compare September 4, 2023 22:35
@carver carver changed the title Handle high concurrency test: run 2k concurrent transfers with 1MB of data Sep 4, 2023
@carver carver merged commit c96ed44 into ethereum:master Sep 4, 2023
@carver carver deleted the handle-high-concurrency branch September 4, 2023 22:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants