Skip to content

Commit

Permalink
First version of client reconnect
Browse files Browse the repository at this point in the history
Issue #5
  • Loading branch information
stepancheg committed May 17, 2017
1 parent d324ef1 commit eb0b9f5
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 46 deletions.
112 changes: 103 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ use bytes::Bytes;

use futures;
use futures::Future;
use futures::stream::Stream;

use tokio_core::reactor;

use native_tls::TlsConnector;

use futures_misc::*;

use error;
use error::Error;
use result::Result;

Expand All @@ -38,7 +40,7 @@ struct LoopToClient {
// used only once to send shutdown signal
shutdown: ShutdownSignal,
_loop_handle: reactor::Remote,
http_conn: Arc<HttpClientConnectionAsync>,
controller_tx: futures::sync::mpsc::UnboundedSender<ControllerCommand>,
}

pub struct Client {
Expand Down Expand Up @@ -137,22 +139,100 @@ impl Client {
}

pub fn dump_state(&self) -> HttpFutureSend<ConnectionStateSnapshot> {
self.loop_to_client.http_conn.dump_state()
let (tx, rx) = futures::sync::oneshot::channel();
// ignore error
drop(self.loop_to_client.controller_tx.send(ControllerCommand::DumpState(tx)));
Box::new(rx.map_err(|_| error::Error::Other("conn died")))
}
}

impl Service for Client {
// TODO: copy-paste with HttpClientConnectionAsync
fn start_request(
&self,
headers: Headers,
body: HttpPartStream)
-> Response
{
debug!("start request {:?}", headers);
self.loop_to_client.http_conn.start_request(headers, body)
let (resp_tx, resp_rx) = futures::sync::mpsc::unbounded();

let start = StartRequestMessage {
headers: headers,
body: body,
resp_tx: resp_tx,
};

if let Err(_) = self.loop_to_client.controller_tx.send(ControllerCommand::StartRequest(start)) {
return Response::err(error::Error::Other("client controller died"));
}

let req_rx = resp_rx.map_err(|()| Error::from(io::Error::new(io::ErrorKind::Other, "req")));

let req_rx = stream_with_eof_and_error(req_rx, || error::Error::Other("client is likely died"));

Response::from_stream(req_rx)
}}

enum ControllerCommand {
_GoAway,
StartRequest(StartRequestMessage),
DumpState(futures::sync::oneshot::Sender<ConnectionStateSnapshot>),
}

struct ControllerState {
handle: reactor::Handle,
socket_addr: SocketAddr,
tls: ClientTlsOption,
conf: ClientConf,
// current connection
conn: Arc<HttpClientConnectionAsync>,
}

impl ControllerState {
fn init_conn(&mut self) {
let (conn, future) = HttpClientConnectionAsync::new(
self.handle.clone(),
&self.socket_addr,
self.tls.clone(),
self.conf.clone());

self.handle.spawn(future.map_err(|e| { warn!("client error: {:?}", e); () }));

self.conn = Arc::new(conn);
}

fn iter(mut self, cmd: ControllerCommand) -> ControllerState {
match cmd {
ControllerCommand::_GoAway => unimplemented!(),
ControllerCommand::StartRequest(start) => {
if let Err(start) = self.conn.start_request_with_resp_sender(start) {
self.init_conn();
if let Err(start) = self.conn.start_request_with_resp_sender(start) {
let err = error::Error::Other("client died and reconnect failed");
// ignore error
drop(start.resp_tx.send(ResultOrEof::Error(err)));
}
}
}
ControllerCommand::DumpState(tx) => {
self.conn.dump_state_with_resp_sender(tx);
}
}
self
}
}

fn controller(init: ControllerState, rx: futures::sync::mpsc::UnboundedReceiver<ControllerCommand>)
-> HttpFuture<()>
{
let rx = rx.map_err(|_| error::Error::Other("channel died"));
let r = rx.fold(init, |state, cmd| {
Ok::<_, error::Error>(state.iter(cmd))
});
let r = r.map(|_| ());
Box::new(r)
}

// Event loop entry point
fn run_client_event_loop(
socket_addr: SocketAddr,
Expand All @@ -161,20 +241,34 @@ fn run_client_event_loop(
send_to_back: mpsc::Sender<LoopToClient>)
{
// Create an event loop.
let mut lp = reactor::Core::new().expect("Core::new");
let mut lp: reactor::Core = reactor::Core::new().expect("Core::new");

// Create a channel to receive shutdown signal.
let (shutdown_signal, shutdown_future) = shutdown_signal();

let (http_conn, http_conn_future) =
HttpClientConnectionAsync::new(lp.handle(), &socket_addr, tls, conf);
let (http_conn, conn_future) =
HttpClientConnectionAsync::new(lp.handle(), &socket_addr, tls.clone(), conf.clone());

lp.handle().spawn(conn_future.map_err(|e| { warn!("client error: {:?}", e); () }));

let init = ControllerState {
handle: lp.handle(),
socket_addr: socket_addr.clone(),
tls: tls,
conf: conf,
conn: Arc::new(http_conn),
};

let (controller_tx, controller_rx) = futures::sync::mpsc::unbounded();

let controller_future = controller(init, controller_rx);

// Send channels back to Http2Client
send_to_back
.send(LoopToClient {
shutdown: shutdown_signal,
_loop_handle: lp.remote(),
http_conn: Arc::new(http_conn),
controller_tx: controller_tx,
})
.expect("send back");

Expand All @@ -187,7 +281,7 @@ fn run_client_event_loop(

// Wait for either completion of connection (i. e. error)
// or shutdown signal.
let done = http_conn_future.join(shutdown_future);
let done = controller_future.join(shutdown_future);

match lp.run(done) {
Ok(_) => {}
Expand Down
60 changes: 36 additions & 24 deletions src/client_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::io;

use error;
use error::ErrorCode;
use error::Error;

Expand Down Expand Up @@ -142,10 +143,10 @@ pub struct HttpClientConnectionAsync {

unsafe impl Sync for HttpClientConnectionAsync {}

struct StartRequestMessage {
headers: Headers,
body: HttpPartStream,
response_handler: futures::sync::mpsc::UnboundedSender<ResultOrEof<HttpStreamPart, Error>>,
pub struct StartRequestMessage {
pub headers: Headers,
pub body: HttpPartStream,
pub resp_tx: futures::sync::mpsc::UnboundedSender<ResultOrEof<HttpStreamPart, Error>>,
}

struct BodyChunkMessage {
Expand All @@ -171,13 +172,13 @@ enum ClientCommandMessage {

impl<I : AsyncRead + AsyncWrite + Send + 'static> ClientWriteLoop<I> {
fn process_start(self, start: StartRequestMessage) -> HttpFuture<Self> {
let StartRequestMessage { headers, body, response_handler } = start;
let StartRequestMessage { headers, body, resp_tx } = start;

let stream_id = self.inner.with(move |inner: &mut ClientInner| {

let mut stream = HttpClientStream {
common: HttpStreamCommon::new(inner.common.conn.peer_settings.initial_window_size),
response_handler: Some(response_handler),
response_handler: Some(resp_tx),
};

stream.common.outgoing.push_back(HttpStreamPartContent::Headers(headers));
Expand Down Expand Up @@ -369,44 +370,55 @@ impl HttpClientConnectionAsync {
HttpClientConnectionAsync::connected(lh, Box::new(tls_conn), conf)
}

pub fn start_request_with_resp_sender(
&self,
start: StartRequestMessage)
-> Result<(), StartRequestMessage>
{
self.call_tx.send(ClientToWriteMessage::Start(start))
.map_err(|send_error| {
match send_error.into_inner() {
ClientToWriteMessage::Start(start) => start,
_ => unreachable!(),
}
})
}

pub fn start_request(
&self,
headers: Headers,
body: HttpPartStream)
-> Response
{
let (tx, rx) = futures::oneshot();
let (resp_tx, resp_rx) = futures::sync::mpsc::unbounded();

let call_tx = self.call_tx.clone();

let (req_tx, req_rx) = futures::sync::mpsc::unbounded();

if let Err(_) = call_tx.send(ClientToWriteMessage::Start(StartRequestMessage {
let start = StartRequestMessage {
headers: headers,
body: body,
response_handler: req_tx,
})) {
return Response::err(Error::Other("client died"));
resp_tx: resp_tx,
};

if let Err(_) = self.start_request_with_resp_sender(start) {
return Response::err(error::Error::Other("client died"));
}

let req_rx = req_rx.map_err(|()| Error::from(io::Error::new(io::ErrorKind::Other, "req")));
let req_rx = resp_rx.map_err(|()| Error::from(io::Error::new(io::ErrorKind::Other, "req")));

// TODO: future is no longer needed here
if let Err(_) = tx.send(stream_with_eof_and_error(req_rx, || Error::from(io::Error::new(io::ErrorKind::Other, "client is likely died")))) {
return Response::err(Error::from(io::Error::new(io::ErrorKind::Other, "oneshot canceled")));
}
let req_rx = stream_with_eof_and_error(req_rx, || error::Error::Other("client is likely died"));

let rx = rx.map_err(|_| Error::from(io::Error::new(io::ErrorKind::Other, "oneshot canceled")));
Response::from_stream(req_rx)
}

Response::from_stream(rx.flatten_stream())
pub fn dump_state_with_resp_sender(&self, tx: futures::sync::oneshot::Sender<ConnectionStateSnapshot>) {
// ignore error
drop(self.command_tx.send(ClientCommandMessage::DumpState(tx)));
}

/// For tests
pub fn dump_state(&self) -> HttpFutureSend<ConnectionStateSnapshot> {
let (tx, rx) = futures::oneshot();

self.command_tx.clone().send(ClientCommandMessage::DumpState(tx))
.expect("send request to dump state");
self.dump_state_with_resp_sender(tx);

let rx = rx.map_err(|_| Error::from(io::Error::new(io::ErrorKind::Other, "oneshot canceled")));

Expand Down
1 change: 1 addition & 0 deletions src/client_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use native_tls::TlsConnector;

use solicit::HttpScheme;

#[derive(Clone)]
pub enum ClientTlsOption {
Plain,
Tls(String, Arc<TlsConnector>), // domain
Expand Down
45 changes: 45 additions & 0 deletions tests/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::str;
use std::thread;
use std::time::Duration;

extern crate bytes;
extern crate httpbis;
Expand Down Expand Up @@ -128,3 +130,46 @@ fn client_call_dropped() {
let state: ConnectionStateSnapshot = client.dump_state().wait().expect("state");
assert_eq!(0, state.streams.len(), "{:?}", state);
}

#[test]
fn reconnect() {
env_logger::init().ok();

let server = HttpServerTester::new();

let client: Client =
Client::new("::1", server.port(), false, Default::default()).expect("connect");

let mut server_tester = server.accept();
server_tester.recv_preface();
server_tester.settings_xchg();

{
let req = client.start_get("/111", "localhost").collect();
server_tester.recv_message(1);
server_tester.send_headers(1, Headers::ok_200(), true);
let resp = req.wait().expect("OK");
assert_eq!(200, resp.headers.status());
}

// drop server connection
drop(server_tester);

// waiting for client connection to die
while let Ok(_) = client.dump_state().wait() {
thread::sleep(Duration::from_millis(1));
}

{
let req = client.start_get("/222", "localhost").collect();

let mut server_tester = server.accept();
server_tester.recv_preface();
server_tester.settings_xchg_but_ack();

server_tester.recv_message(1);
server_tester.send_headers(1, Headers::ok_200(), true);
let resp = req.wait().expect("OK");
assert_eq!(200, resp.headers.status());
}
}
Loading

0 comments on commit eb0b9f5

Please sign in to comment.