-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: run 2k concurrent transfers with 1MB of data #81
Conversation
src/socket.rs
Outdated
let num_connections = self.conns.read().unwrap().len(); | ||
if num_connections < self.max_conns { | ||
break; | ||
} | ||
self.outbound_dropped.notified().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this holding the read() lock across the await? Do we need something like:
let num_connections = self.conns.read().unwrap().len(); | |
if num_connections < self.max_conns { | |
break; | |
} | |
self.outbound_dropped.notified().await; | |
let num_connections = { | |
self.conns.read().unwrap().len() | |
}; | |
if num_connections < self.max_conns { | |
break; | |
} | |
self.outbound_dropped.notified().await; |
It hasn't seemed to cause problems during testing, but it's obviously worth double-checking.
src/socket.rs
Outdated
} | ||
|
||
impl UtpSocket<SocketAddr> { | ||
pub async fn bind(addr: SocketAddr) -> io::Result<Self> { | ||
pub async fn bind(addr: SocketAddr, max_conns: usize) -> io::Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a docstring with some hints about how to choose max_conns
. Roughly:
- 100 seems like a reasonable starting place for a lot of use cases, 1k is probably a bit too high, depending on your setup, 10k will likely cause overload, 1-5 is probably so small that you won't be getting the full potential of the library
- A too-high value can cause connection timeouts, and various logs about packets taking too long to be processed locally
- A too-low value will cause connections to delay for a long time, while waiting for
connect()
orconnect_with_cid()
to return. You will not experience the full performance of the library
src/conn.rs
Outdated
max_idle_timeout, | ||
max_packet_size: congestion::DEFAULT_MAX_PACKET_SIZE_BYTES as u16, | ||
initial_timeout: congestion::DEFAULT_INITIAL_TIMEOUT, | ||
min_timeout: congestion::DEFAULT_MIN_TIMEOUT, | ||
max_timeout: max_idle_timeout, | ||
max_timeout: max_idle_timeout / 4, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the 4 to a file constant, and explain the reasoning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work! left some comments on my first pass.
it might be easier to break these up into smaller PRs, since it will become difficult to keep the changes for PR back-and-forth organized, and I'd prefer to squash all commits regardless
I would see a minimum of 3 sets of changes:
- concurrent connection limits
tokio::select
biasing- exponential backoff for outgoing connection establishment
src/conn.rs
Outdated
@@ -249,9 +249,16 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> { | |||
tokio::pin!(idle_timeout); | |||
loop { | |||
tokio::select! { | |||
biased; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of biased
, could we call try_recv
in the idle timeout branch, only registering the idle timeout if that channel is empty?
if we keep the biased
, do we need to move any of the other branches (e.g. shutdown)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's an interesting approach that hadn't occurred to me.
The tradeoffs I'm seeing on that choice are:
Pros of switching to try_recv
- Fewer opportunities to mess up the branch order in subtle ways
biased
is a less-familiar approach, it requires more understanding from the contributors
Cons of switching to try_recv
- Slight performance loss (
biased
is a bit faster) - In maintenance, we'd want to make sure that any path that resets
idle_timeout
is also double-checked in the timeout branch. For example here, I guess we'd also want to check thewrites.try_recv()
- More lines of code: the timeout processing gets messier. Do we just restart the loop if there is a packet waiting on the queue (in which case we might loop through a random number of times before hitting the branch we want), or we could actually process the packet in the idle-timeout branch, which seems surprising and leaves room open for bugs where someone makes a change in the packet-processing branch that doesn't get copied to the timeout branch.
Based on these tradeoffs, I'm inclined to keep the biased
approach.
do we need to move any of the other branches (e.g. shutdown)?
I haven't found any other important races, when both branches are notified, and we need to process one before the other. We might get some performance benefit of processing writes before reads, or the other way around, but I can't see any place where it would cause a problem. Eventually, one branch will always drain out and we'll move onto the next one. It doesn't seem we'll get stuck in any particular branch.
So I generally favored changing the order as little as possible. What's your thinking on moving the shutdown branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another potential benefit of keeping biased
is using it to do something like: process all incoming packets before sending the SYN that ACKs the very latest one.
src/conn.rs
Outdated
let queue_time = Instant::now() - receive_time; | ||
if queue_time > self.config.target_delay { | ||
tracing::warn!(?queue_time, "incoming packet queued for way too long"); | ||
} else if queue_time > Duration::from_millis(40) { | ||
tracing::debug!(?queue_time, "incoming packet queued for too long"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
were these logs meaningful? I am more inclined to make this sort of info available via metrics, since we do not have to make some judgement about what is "too long".
src/conn.rs
Outdated
@@ -285,10 +286,31 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> { | |||
_ = self.writable.notified() => { | |||
self.process_writes(Instant::now()); | |||
} | |||
Some(Ok(timeout)) = self.unacked.next() => { | |||
let (_seq, packet) = timeout; | |||
//tracing::debug!(seq, ack = %packet.ack_num(), packet = ?packet.packet_type(), "timeout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we left this commented out by mistake. was the intention to remove the log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, it was just very noisy during testing. I'll add it back.
src/conn.rs
Outdated
match self.state { | ||
State::Closing { local_fin, remote_fin, .. } => { | ||
if unacked.len() == 1 && local_fin.is_some() && &local_fin.unwrap() == unacked.last().unwrap() { | ||
tracing::debug!(?unacked, ?local_fin, ?remote_fin, "Idle timeout with both FINs sent, with only FIN unacked."); | ||
} else { | ||
tracing::warn!(?unacked, ?local_fin, ?remote_fin, "idle timeout expired while closing..."); | ||
} | ||
} | ||
State::Established { .. } => { | ||
tracing::warn!(?unacked, "idle timeout expired in established connection, closing..."); | ||
} | ||
State::Connecting { .. } => { | ||
tracing::warn!(?unacked, "idle timeout expired while connecting, closing..."); | ||
} | ||
State::Closed { .. } => unreachable!("In an if block that excludes the closed state"), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of all these different match
arms, it would be nicer to have a compact representation of the state that we can emit alongside the idle timeout
src/conn.rs
Outdated
let send_result = self.socket_events | ||
.send(SocketEvent::Outgoing((state, self.cid.peer.clone()))); | ||
if send_result.is_err() { | ||
tracing::warn!("Cannot send STATE packet: socket_events closed"); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let send_result = self.socket_events | |
.send(SocketEvent::Outgoing((state, self.cid.peer.clone()))); | |
if send_result.is_err() { | |
tracing::warn!("Cannot send STATE packet: socket_events closed"); | |
return; | |
} | |
if let Err(err) = self.socket_events.send(SocketEvent::Outgoing((state, self.cid.peer.clone()))) { | |
tracing::warn!("cannot send STATE packet: socket closed channel"); | |
return; | |
} |
src/conn.rs
Outdated
let send_result = socket_events.send(SocketEvent::Outgoing((packet, dest.clone()))); | ||
if send_result.is_err() { | ||
tracing::error!("Cannot transmit packet: socket_events closed"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let send_result = socket_events.send(SocketEvent::Outgoing((packet, dest.clone()))); | |
if send_result.is_err() { | |
tracing::error!("Cannot transmit packet: socket_events closed"); | |
} | |
if let Err(err) = socket_events.send(SocketEvent::Outgoing((packet, dest.clone()))) { | |
tracing::error!("cannot transmit packet: socket closed channel"); | |
} |
this log level is error
while similar log above is warn
. we should probably make both error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 to unifying the log level. It's fairly obnoxious to have a passing test end with an error log, and it happens pretty regularly, so I'd like to make them both warn.
src/socket.rs
Outdated
} | ||
|
||
impl UtpSocket<SocketAddr> { | ||
pub async fn bind(addr: SocketAddr) -> io::Result<Self> { | ||
pub async fn bind(addr: SocketAddr, max_conns: usize) -> io::Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer this go in a socket-level config since we want to move in that direction anyway #38
src/socket.rs
Outdated
/// Maximum allowed connections, which only restricts outbound connections, but counts all | ||
/// active connections. | ||
max_conns: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the rationale on restricting only outbound connections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this one is a little fuzzy. I suppose that:
- outgoing connections feel delayable until we want to send the data, but inbound connections were presumably initiated by someone else, so we can't just wait as long as we want to accept
- we might have a preference to receive data (help selves) over sending it (help others), if we are resource constrained
Any thoughts on this? I suppose the alternative would be to add the same pausing mechanism into the accepts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the first point is more important than the second.
if we delay accepting a connection, then the initiating endpoint may give up on the connection before we accept, and then when we accept the connection we will consume a connection until the idle timeout.
uTP streams are bidirectional, and the accepting endpoint of a uTP stream is not necessarily the endpoint receiving data, so I don't think we can make assumptions about one endpoint being the sender and the other being the receiver.
that said, I think it's okay to only limit outgoing connections if we make it clear from the config that it is not total connections. if it becomes evident that an unlimited number of incoming (i.e. accepted) connections are harming outgoing connections, then we can address it with a different approach.
1b4fb5c
to
ec9d35b
Compare
9931be2
to
0f7fa5e
Compare
0f7fa5e
to
2da6d7b
Compare
2da6d7b
to
4cbcbf6
Compare
Test error discussed in #103 |
1a1b0c3
to
ea06fb9
Compare
Now we can reliably handle the launching of many more streams, so let's really stress-test the library. This has helped us identify and resolve a number of bugs, in local testing. Now that they seem to be fixed, we can merge to master to prevent regressions. Bonus cleanups on @njgheorghita 's original change: - Change the high end of the range to equal the number of concurrent streams - Log showing the transfer speed in socket test - Run an optimized build of utp when testing, for more performance (and interesting results) in the high-concurrency tests - Return from the test as soon as an error is found, cancelling all in-flight transfers
6e041ae
to
daaeb19
Compare
This used to be a PR that grouped up all the changes needed to pass the high-concurrency tests. Now, with all the changes merged, the test is the only commit left, and it passes against master.
Previous writeup, for posterity
Handle High Concurrency
The minimal functional changes required to pass the high-volume socket test, plus some logs, pulled from #69
The commits should be fairly clean to inspect individually, if you're into that kind of thing.
The functional changes are:
Altogether, these changes reliably cause the new test to pass, even when bumped up to 10,000 simultaneously-launched transfers. (Though the simultaneous bit is mostly a red-herring now, since the stream prevents them from actually starting to run at the same time)
There was a probabilistic success improvement for most of these changes, though the connection limiting was the final piece that gave apparent reliability. It might be tempting to drop some of these changes and just stick to the connection limit. However, since the connection limit is a configurable value, we want to support high connection rates as much as possible. So it seems worth keeping the earlier functional changes, to do a better job if users try high values.
Notably not present is the "stress tracking" or "busy/idle tracking" from the socket. Both provided interesting information, but were not reliable enough to use to apply back-pressure. The added code complexity wasn't worth the (admittedly quite valuable) logs, IMO.
The logging changes are mostly not visible except in stress/failure situations, but they are very helpful for debugging, so I think we should keep them.