Skip to content

Commit

Permalink
Merge pull request #309 from nervosnetwork/fix-yamux-close
Browse files Browse the repository at this point in the history
fix: Fix yamux close
  • Loading branch information
driftluo authored Feb 22, 2021
2 parents 2cdd795 + 6cc45c9 commit 9dec36b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 18 deletions.
3 changes: 0 additions & 3 deletions tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ env_logger = "0.6.0"
crossbeam-channel = "0.3.6"
systemstat = "0.1.3"
futures-test = "0.3.5"
## lock on 1.1
## https://github.com/myrrlyn/funty/issues/3
funty = "=1.1.0"

[target.'cfg(unix)'.dev-dependencies]
nix = "0.13.0"
Expand Down
60 changes: 56 additions & 4 deletions yamux/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures::{
channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
Sink, Stream,
};
use log::{debug, log_enabled, trace, warn};
use log::{debug, log_enabled, trace};
use tokio::prelude::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

Expand Down Expand Up @@ -239,6 +239,12 @@ where
let frame = Frame::new_go_away(code);
self.send_frame(cx, frame)?;
self.local_go_away = true;
let mut new_timer = interval(self.config.connection_write_timeout);
// force registration of new timer to driver
let _ignore = Pin::new(&mut new_timer).as_mut().poll_next(cx);
// Reuse the keepalive timer to set a time out. If remote peer does not respond
// within the time out, consider this session as remote gone away.
self.keepalive = Some(new_timer);
Ok(())
}

Expand Down Expand Up @@ -459,7 +465,10 @@ where
}
} else {
// TODO: stream already closed ?
warn!("substream({}) should exist but not", stream_id);
debug!(
"substream({}) should exist but not, may drop by self",
stream_id
);
false
}
};
Expand Down Expand Up @@ -621,7 +630,13 @@ where
if let Some(ref mut interval) = self.keepalive {
match Pin::new(interval).as_mut().poll_next(cx) {
Poll::Ready(Some(_)) => {
self.keep_alive(cx, Instant::now())?;
if self.local_go_away {
// The remote peer has not responded to our sent go away code.
// Assume that remote peer has gone away and this session should be closed.
self.remote_go_away = true;
} else {
self.keep_alive(cx, Instant::now())?;
}
}
Poll::Ready(None) => {
debug!("yamux::Session poll keepalive interval finished");
Expand Down Expand Up @@ -675,7 +690,14 @@ mod timer {
#[cfg(feature = "generic-timer")]
pub use generic_time::{interval, Interval};
#[cfg(feature = "tokio-timer")]
pub use tokio::time::{interval, Interval};
pub use tokio::time::Interval;

#[cfg(feature = "tokio-timer")]
pub fn interval(period: ::std::time::Duration) -> Interval {
use tokio::time::{self, interval_at};

interval_at(time::Instant::now() + period, period)
}

#[cfg(target_arch = "wasm32")]
pub use wasm_mock::Instant;
Expand Down Expand Up @@ -839,6 +861,7 @@ mod test {
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::{
io::AsyncReadExt,
Expand Down Expand Up @@ -1094,4 +1117,33 @@ mod test {
assert_eq!(reset_msg.stream_id(), 5)
});
}

#[test]
fn test_remote_does_not_respond_go_away() {
let mut rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async {
let (_remote, local) = MockSocket::new();
let mut config = Config::default();
config.enable_keepalive = false;
config.connection_write_timeout = Duration::from_secs(1);

let mut session = Session::new_server(local, config);

let mut control = session.control();
tokio::spawn(async move {
let _ignore = control.close().await;
});

// The purpose of this test is to ensure that if the remote does not respond to the
// go away message, it must be able to actively disconnect the session instead of hanging.
// So, if the test fails to exit, it means there has a problem
while let Some(Ok(mut stream)) = session.next().await {
tokio::spawn(async move {
let mut buf = [0; 100];
let _ignore = stream.read(&mut buf).await;
});
}
});
}
}
61 changes: 50 additions & 11 deletions yamux/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,18 +449,22 @@ impl AsyncWrite for StreamHandle {

impl Drop for StreamHandle {
fn drop(&mut self) {
if !self.unbound_event_sender.is_closed()
&& self.state != StreamState::Closed
&& self.state != StreamState::LocalClosing
{
let mut flags = self.get_flags();
flags.add(Flag::Rst);
let frame = Frame::new_window_update(flags, self.id, 0);
let rst_event = StreamEvent::Frame(frame);
if !self.unbound_event_sender.is_closed() && self.state != StreamState::Closed {
let event = StreamEvent::Closed(self.id);
// Always successful unless the session is dropped
let _ignore = self.unbound_event_sender.unbounded_send(rst_event);
let _ignore = self.unbound_event_sender.unbounded_send(event);
if self.state == StreamState::LocalClosing {
// LocalClosing means that local have sent Fin to the remote and waiting for a response.
// So, here only need to send a cleanup message
let _ignore = self.unbound_event_sender.unbounded_send(event);
} else {
let mut flags = self.get_flags();
flags.add(Flag::Rst);
let frame = Frame::new_window_update(flags, self.id, 0);
let rst_event = StreamEvent::Frame(frame);

// Always successful unless the session is dropped
let _ignore = self.unbound_event_sender.unbounded_send(rst_event);
let _ignore = self.unbound_event_sender.unbounded_send(event);
}
}
}
}
Expand Down Expand Up @@ -575,6 +579,41 @@ mod test {
});
}

#[test]
fn test_drop_with_state_local_close() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let (_frame_sender, frame_receiver) = channel(2);
let (unbound_sender, mut unbound_receiver) = unbounded();
let mut stream = StreamHandle::new(
0,
unbound_sender,
frame_receiver,
StreamState::Init,
INITIAL_STREAM_WINDOW,
INITIAL_STREAM_WINDOW,
);

let _ignore = stream.shutdown().await;

let event = unbound_receiver.next().await.unwrap();
match event {
StreamEvent::Frame(frame) => {
assert!(frame.flags().contains(Flag::Fin));
assert_eq!(frame.ty(), Type::WindowUpdate);
}
_ => panic!("must be fin window update"),
}

drop(stream);
let event = unbound_receiver.next().await.unwrap();
match event {
StreamEvent::Closed(_) => (),
_ => panic!("must be state closed"),
}
});
}

#[test]
fn test_data_large_than_recv_window() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
Expand Down

0 comments on commit 9dec36b

Please sign in to comment.