From 57c052b59f4cd39b349e93736641958ca596961d Mon Sep 17 00:00:00 2001 From: Stepan Koltsov Date: Sun, 28 Oct 2018 19:26:18 +0000 Subject: [PATCH] Use Sink API instead of Stream for client Issue #34 --- httpbis-test/src/tester.rs | 18 ++++ httpbis-test/tests/client.rs | 150 +++++++++++++++++++++++++++++++++- src/bin/client_server_loop.rs | 20 ++--- src/client/client_conn.rs | 101 ++++++++++++++++------- src/client/client_sender.rs | 92 +++++++++++++++++++++ src/client/mod.rs | 71 +++++++++++----- src/common/conn.rs | 29 ------- src/common/mod.rs | 4 +- src/common/window_size.rs | 1 + src/data_or_trailers.rs | 3 +- src/lib.rs | 2 + src/service.rs | 6 +- 12 files changed, 392 insertions(+), 105 deletions(-) create mode 100644 src/client/client_sender.rs diff --git a/httpbis-test/src/tester.rs b/httpbis-test/src/tester.rs index 08ca689d..efc3f25a 100644 --- a/httpbis-test/src/tester.rs +++ b/httpbis-test/src/tester.rs @@ -413,6 +413,24 @@ impl HttpConnTester { assert!(data.is_empty()); } + pub fn recv_frames_data_check( + &mut self, + stream_id: StreamId, + frame_size: usize, + total_size: usize, + end: bool, + ) -> Vec { + let mut r = Vec::new(); + while r.len() != total_size { + let expect_frame_end = end && (r.len() + frame_size >= total_size); + let frame = self.recv_frame_data_check(stream_id, expect_frame_end); + assert!(frame.len() == frame_size || r.len() + frame.len() == total_size); + r.extend_from_slice(&frame[..]); + } + assert_eq!(total_size, r.len()); + r + } + /// Receive at most two frames till END_STREAM frame /// * if first frame has not END_STREAM, second must have empty payload pub fn recv_frame_data_tail(&mut self, stream_id: StreamId) -> Vec { diff --git a/httpbis-test/tests/client.rs b/httpbis-test/tests/client.rs index e7a6fa57..cde39996 100644 --- a/httpbis-test/tests/client.rs +++ b/httpbis-test/tests/client.rs @@ -23,6 +23,9 @@ use futures::sync::oneshot; use tokio_core::reactor; +use futures::executor; +use futures::future; +use futures::Async; use httpbis::for_test::solicit::DEFAULT_SETTINGS; use httpbis::for_test::*; use httpbis::ErrorCode; @@ -218,8 +221,7 @@ pub fn issue_89() { let r1 = client.start_get("/r1", "localhost"); - server_tester.recv_frame_headers_check(1, false); - assert!(server_tester.recv_frame_data_tail(1).is_empty()); + server_tester.recv_frame_headers_check(1, true); server_tester.send_headers(1, Headers::ok_200(), false); let (_, resp1) = r1.0.wait().unwrap(); @@ -284,3 +286,147 @@ fn external_event_loop() { t.join().expect("join"); } + +#[test] +pub fn sink_poll() { + init_logger(); + + let (mut server_tester, client) = HttpConnTester::new_server_with_client_xchg(); + + let (mut sender, _response) = client + .start_post_sink("/foo", "sink") + .wait() + .expect("start_post_sink"); + + server_tester.recv_frame_headers_check(1, false); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(65535, client.conn_state().out_window_size); + assert_eq!(65535, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(65535, client.stream_state(1).pump_out_window_size); + + assert_eq!(Ok(Async::Ready(())), sender.poll()); + + let b = Bytes::from(vec![1; 65_535]); + sender.send_data(b.clone()).expect("send_data"); + + assert_eq!( + b, + Bytes::from(server_tester.recv_frames_data_check(1, 16_384, 65_535, false)) + ); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(0, client.conn_state().out_window_size); + assert_eq!(0, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(0, client.stream_state(1).out_window_size); + assert_eq!(0, client.stream_state(1).pump_out_window_size); + + let mut sender = executor::spawn(future::lazy(move || { + assert_eq!(Ok(Async::NotReady), sender.poll()); + future::ok::<_, ()>(sender) + })).wait_future() + .unwrap(); + + server_tester.send_window_update_conn(3); + server_tester.send_window_update_stream(1, 5); + + sender.wait().unwrap(); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(3, client.conn_state().out_window_size); + assert_eq!(3, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(5, client.stream_state(1).out_window_size); + assert_eq!(5, client.stream_state(1).pump_out_window_size); + + let b = Bytes::from(vec![11, 22]); + sender.send_data(b.clone()).expect("send_data"); + assert_eq!( + b, + Bytes::from(server_tester.recv_frame_data_check(1, false)) + ); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(1, client.conn_state().out_window_size); + assert_eq!(1, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(3, client.stream_state(1).out_window_size); + assert_eq!(3, client.stream_state(1).pump_out_window_size); + + sender.wait().unwrap(); + + let b = Bytes::from(vec![33, 44]); + sender.send_data(b.clone()).expect("send_data"); + assert_eq!( + b.slice(0, 1), + Bytes::from(server_tester.recv_frame_data_check(1, false)) + ); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(0, client.conn_state().out_window_size); + assert_eq!(-1, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(2, client.stream_state(1).out_window_size); + assert_eq!(1, client.stream_state(1).pump_out_window_size); +} + +#[test] +fn sink_reset_by_peer() { + init_logger(); + + let (mut server_tester, client) = HttpConnTester::new_server_with_client_xchg(); + + let (mut sender, _response) = client + .start_post_sink("/foo", "sink") + .wait() + .expect("start_post_sink"); + + server_tester.recv_frame_headers_check(1, false); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(65535, client.conn_state().out_window_size); + assert_eq!(65535, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(65535, client.stream_state(1).out_window_size); + assert_eq!(65535, client.stream_state(1).pump_out_window_size); + + assert_eq!(Ok(Async::Ready(())), sender.poll()); + + let b = Bytes::from(vec![1; 65_535 * 2]); + sender.send_data(b.clone()).expect("send_data"); + + assert_eq!( + b.slice(0, 65_535), + Bytes::from(server_tester.recv_frames_data_check(1, 16_384, 65_535, false)) + ); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(0, client.conn_state().out_window_size); + assert_eq!(-65535, client.conn_state().pump_out_window_size); + assert_eq!(65535, client.stream_state(1).in_window_size); + assert_eq!(0, client.stream_state(1).out_window_size); + assert_eq!(-65535, client.stream_state(1).pump_out_window_size); + + server_tester.send_rst(1, ErrorCode::Cancel); + + while client.conn_state().streams.len() != 0 { + // spin-wait + } + + // pump out window must be reset to out window + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(0, client.conn_state().out_window_size); + assert_eq!(0, client.conn_state().pump_out_window_size); + + // check that if more data is sent, pump_out_window_size is not exhausted + + let b = Bytes::from(vec![1; 100_000]); + sender.send_data(b.clone()).expect("send_data"); + + assert_eq!(65535, client.conn_state().in_window_size); + assert_eq!(0, client.conn_state().out_window_size); + assert_eq!(0, client.conn_state().pump_out_window_size); +} diff --git a/src/bin/client_server_loop.rs b/src/bin/client_server_loop.rs index 28894215..868ead7b 100644 --- a/src/bin/client_server_loop.rs +++ b/src/bin/client_server_loop.rs @@ -5,7 +5,6 @@ extern crate httpbis; use bytes::Bytes; use futures::future::Future; use futures::stream::Stream; -use futures::sync::mpsc; use httpbis::Client; use httpbis::Headers; use httpbis::HttpStreamAfterHeaders; @@ -95,19 +94,16 @@ fn ping_pong() { Default::default(), ).expect("client"); - let (request_tx, request_rx) = mpsc::unbounded(); - - let (header, body) = client - .start_post_stream( - "/any", - "localhost", - request_rx.map_err(|()| httpbis::Error::Other("other")), - ).wait() + let (mut sender, response) = client + .start_post_sink("/any", "localhost") + .wait() .expect("request"); + let (header, response) = response.wait().expect("response wait"); + assert_eq!(200, header.status()); - let body = body.filter_data(); + let body = response.filter_data(); let mut body = body.wait(); let mut i = 0u32; @@ -115,9 +111,7 @@ fn ping_pong() { i = i.wrapping_add(1); let mut req = Vec::new(); req.resize(BLOCK_SIZE, i as u8); - request_tx - .unbounded_send(Bytes::from(req)) - .expect("send error"); + sender.send_data(Bytes::from(req)).expect("send_data"); let mut read = 0; while read < BLOCK_SIZE { diff --git a/src/client/client_conn.rs b/src/client/client_conn.rs index ab00437a..3b2a7c14 100644 --- a/src/client/client_conn.rs +++ b/src/client/client_conn.rs @@ -16,8 +16,6 @@ use solicit::header::*; use solicit::StreamId; use solicit::DEFAULT_SETTINGS; -use service::Service; - use futures::future::Future; use futures::stream::Stream; use futures::sync::mpsc::unbounded; @@ -35,13 +33,16 @@ use tokio_tls_api; use solicit_async::*; use common::*; -use data_or_trailers::*; use socket::*; +use bytes::Bytes; +use client::client_sender::ClientSender; +use client::ClientInterface; use client_died_error_holder::ClientDiedErrorHolder; use common::client_or_server::ClientOrServer; use data_or_headers::DataOrHeaders; use data_or_headers_with_flag::DataOrHeadersWithFlag; +use futures::future; use headers_place::HeadersPlace; use req_resp::RequestOrResponse; use result_or_eof::ResultOrEof; @@ -94,12 +95,19 @@ unsafe impl Sync for ClientConn {} pub struct StartRequestMessage { pub headers: Headers, - pub body: HttpStreamAfterHeaders, - pub resp_tx: oneshot::Sender, + pub body: Option, + pub trailers: Option, + pub end_stream: bool, + pub resp_tx: oneshot::Sender>, +} + +pub struct ClientStartRequestMessage { + start: StartRequestMessage, + write_tx: UnboundedSender, } -enum ClientToWriteMessage { - Start(StartRequestMessage), +pub(crate) enum ClientToWriteMessage { + Start(ClientStartRequestMessage), WaitForHandshake(oneshot::Sender>), Common(CommonToWriteMessage), } @@ -133,16 +141,22 @@ impl Conn> where I: AsyncWrite + AsyncRead + Send + 'static, { - fn process_start(&mut self, start: StartRequestMessage) -> result::Result<()> { - let StartRequestMessage { - headers, - body, - resp_tx, + fn process_start(&mut self, start: ClientStartRequestMessage) -> result::Result<()> { + let ClientStartRequestMessage { + start: + StartRequestMessage { + headers, + body, + trailers, + end_stream, + resp_tx, + }, + write_tx, } = start; let stream_id = self.next_local_stream_id(); - let out_window = { + { let (mut http_stream, resp_stream, out_window) = self.new_stream_data( stream_id, None, @@ -150,16 +164,30 @@ where ClientStreamData {}, ); - if let Err(_) = resp_tx.send(Response::from_stream(resp_stream)) { + let r = Ok(( + ClientSender { + stream_id, + write_tx, + out_window, + }, + Response::from_stream(resp_stream), + )); + + if let Err(_) = resp_tx.send(r) { warn!("caller died"); } http_stream.push_back(DataOrHeaders::Headers(headers)); - - out_window - }; - - self.pump_stream_to_write_loop(stream_id, body.into_part_stream(), out_window); + if let Some(body) = body { + http_stream.push_back(DataOrHeaders::Data(body)); + } + if let Some(trailers) = trailers { + http_stream.push_back(DataOrHeaders::Headers(trailers)); + } + if end_stream { + http_stream.close_outgoing(ErrorCode::NoError); + } + } // Also opens latch if necessary self.buffer_outg_conn()?; @@ -322,10 +350,15 @@ impl ClientConn { &self, start: StartRequestMessage, ) -> Result<(), StartRequestMessage> { + let client_start = ClientStartRequestMessage { + start, + write_tx: self.write_tx.clone(), + }; + self.write_tx - .unbounded_send(ClientToWriteMessage::Start(start)) + .unbounded_send(ClientToWriteMessage::Start(client_start)) .map_err(|send_error| match send_error.into_inner() { - ClientToWriteMessage::Start(start) => start, + ClientToWriteMessage::Start(start) => start.start, _ => unreachable!(), }) } @@ -362,29 +395,35 @@ impl ClientConn { } } -impl Service for ClientConn { +impl ClientInterface for ClientConn { // TODO: copy-paste with Client::start_request - fn start_request(&self, headers: Headers, body: HttpStreamAfterHeaders) -> Response { + fn start_request( + &self, + headers: Headers, + body: Option, + trailers: Option, + end_stream: bool, + ) -> HttpFutureSend<(ClientSender, Response)> { let (resp_tx, resp_rx) = oneshot::channel(); let start = StartRequestMessage { - headers: headers, - body: body, - resp_tx: resp_tx, + headers, + body, + trailers, + end_stream, + resp_tx, }; if let Err(_) = self.start_request_with_resp_sender(start) { - return Response::err(error::Error::Other("client died")); + return Box::new(future::err(error::Error::Other("client died"))); } let resp_rx = resp_rx.map_err(|oneshot::Canceled| error::Error::Other("client likely died")); - let resp_rx = resp_rx.map(|r| r.into_stream_flag()); - - let resp_rx = resp_rx.flatten_stream(); + let resp_rx = resp_rx.map_err(move |_| error::Error::Other("TODO")); - Response::from_stream(resp_rx) + Box::new(resp_rx.flatten()) } } diff --git a/src/client/client_sender.rs b/src/client/client_sender.rs new file mode 100644 index 00000000..ef0c8cb6 --- /dev/null +++ b/src/client/client_sender.rs @@ -0,0 +1,92 @@ +use bytes::Bytes; +use client::client_conn::ClientToWriteMessage; +use common::conn_write::CommonToWriteMessage; +use common::window_size::StreamDead; +use common::window_size::StreamOutWindowReceiver; +use data_or_headers::DataOrHeaders; +use data_or_headers_with_flag::DataOrHeadersWithFlag; +use error; +use futures::future; +use futures::sync::mpsc; +use futures::sync::mpsc::UnboundedSender; +use futures::Future; +use futures::Poll; +use result; +use solicit::StreamId; +use ErrorCode; +use Headers; + +pub struct ClientSender { + pub(crate) stream_id: StreamId, + pub(crate) write_tx: UnboundedSender, + pub(crate) out_window: StreamOutWindowReceiver, +} + +impl ClientSender { + pub fn poll(&mut self) -> Poll<(), StreamDead> { + self.out_window.poll() + } + + pub fn wait(&mut self) -> Result<(), StreamDead> { + future::poll_fn(|| self.poll()).wait() + } + + fn send_common(&mut self, message: CommonToWriteMessage) -> result::Result<()> { + // TODO: why client died? + self.write_tx + .unbounded_send(ClientToWriteMessage::Common(message)) + .map_err(|_: mpsc::SendError<_>| error::Error::ClientDied(None)) + } + + pub fn send_data(&mut self, data: Bytes) -> result::Result<()> { + let stream_id = self.stream_id; + self.out_window.decrease(data.len()); + self.send_common(CommonToWriteMessage::StreamEnqueue( + stream_id, + DataOrHeadersWithFlag { + content: DataOrHeaders::Data(data), + last: false, + }, + )) + } + + pub fn send_data_end_of_stream(&mut self, data: Bytes) -> result::Result<()> { + let stream_id = self.stream_id; + self.out_window.decrease(data.len()); + self.send_common(CommonToWriteMessage::StreamEnqueue( + stream_id, + DataOrHeadersWithFlag { + content: DataOrHeaders::Data(data), + last: true, + }, + )) + } + + pub fn send_trailers(&mut self, trailers: Headers) -> result::Result<()> { + let stream_id = self.stream_id; + self.send_common(CommonToWriteMessage::StreamEnqueue( + stream_id, + DataOrHeadersWithFlag { + content: DataOrHeaders::Headers(trailers), + last: true, + }, + )) + } + + pub fn reset(&mut self, error_code: ErrorCode) -> result::Result<()> { + // TODO: do nothing if stream is explicitly closed + let stream_id = self.stream_id; + self.send_common(CommonToWriteMessage::StreamEnd(stream_id, error_code)) + } + + pub fn close(&mut self) -> result::Result<()> { + self.reset(ErrorCode::NoError) + } +} + +impl Drop for ClientSender { + fn drop(&mut self) { + // Not sure correct code + drop(self.reset(ErrorCode::Cancel)) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index fed83fc8..eda18fd7 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,6 @@ pub mod client_conf; pub mod client_conn; +pub mod client_sender; pub mod client_tls; use std::net::SocketAddr; @@ -11,7 +12,6 @@ use bytes::Bytes; use futures::future; use futures::future::Future; -use futures::stream; use futures::stream::Stream; use futures::sync::mpsc::unbounded; use futures::sync::mpsc::UnboundedReceiver; @@ -39,8 +39,6 @@ use solicit_async::*; use client_died_error_holder::*; use common::*; -use data_or_trailers::*; -use service::Service; use socket::AnySocketAddr; use socket::ToClientStream; @@ -48,6 +46,7 @@ use client::client_conf::ClientConf; use client::client_conn::ClientConn; use client::client_conn::ClientConnCallbacks; use client::client_conn::StartRequestMessage; +use client::client_sender::ClientSender; pub use client::client_tls::ClientTlsOption; /// Builder for HTTP/2 client. @@ -262,9 +261,16 @@ impl Client { client.build() } - /// Start HTTP/2 request. - pub fn start_request_simple(&self, headers: Headers, body: Bytes) -> Response { - self.start_request(headers, HttpStreamAfterHeaders::once_bytes(body)) + pub fn start_request_end_stream( + &self, + headers: Headers, + body: Option, + trailers: Option, + ) -> Response { + Response::new( + self.start_request(headers, body, trailers, true) + .and_then(move |(_sender, response)| response), + ) } /// Start HTTP/2 `GET` request. @@ -275,27 +281,32 @@ impl Client { Header::new(":authority", authority.to_owned()), Header::new(":scheme", self.http_scheme.as_bytes()), ]); - self.start_request_simple(headers, Bytes::new()) + self.start_request_end_stream(headers, None, None) } /// Start HTTP/2 `POST` request. pub fn start_post(&self, path: &str, authority: &str, body: Bytes) -> Response { - self.start_post_stream(path, authority, stream::once(Ok(body))) + let headers = Headers(vec![ + Header::new(":method", "POST"), + Header::new(":path", path.to_owned()), + Header::new(":authority", authority.to_owned()), + Header::new(":scheme", self.http_scheme.as_bytes()), + ]); + self.start_request_end_stream(headers, Some(body), None) } - pub fn start_post_stream( + pub fn start_post_sink( &self, path: &str, authority: &str, - body: impl Stream + Send + 'static, - ) -> Response { + ) -> HttpFutureSend<(ClientSender, Response)> { let headers = Headers(vec![ Header::new(":method", "POST"), Header::new(":path", path.to_owned()), Header::new(":authority", authority.to_owned()), Header::new(":scheme", self.http_scheme.as_bytes()), ]); - self.start_request(headers, HttpStreamAfterHeaders::bytes(body)) + self.start_request(headers, None, None, false) } /// For tests @@ -326,14 +337,33 @@ impl Client { } } -impl Service for Client { - // TODO: copy-paste with ClientConnection::start_request - fn start_request(&self, headers: Headers, body: HttpStreamAfterHeaders) -> Response { +pub trait ClientInterface { + /// Start HTTP/2 request. + fn start_request( + &self, + headers: Headers, + body: Option, + trailers: Option, + end_stream: bool, + ) -> HttpFutureSend<(ClientSender, Response)>; +} + +impl ClientInterface for Client { + // TODO: copy-paste with ClientConn::start_request + fn start_request( + &self, + headers: Headers, + body: Option, + trailers: Option, + end_stream: bool, + ) -> HttpFutureSend<(ClientSender, Response)> { let (resp_tx, resp_rx) = oneshot::channel(); let start = StartRequestMessage { headers, body, + trailers, + end_stream, resp_tx, }; @@ -341,17 +371,14 @@ impl Service for Client { .controller_tx .unbounded_send(ControllerCommand::StartRequest(start)) { - return Response::err(error::Error::Other("client controller died")); + // TODO: named error + return Box::new(future::err(error::Error::Other("client controller died"))); } let client_error = self.client_died_error_holder.clone(); let resp_rx = resp_rx.map_err(move |oneshot::Canceled| client_error.error()); - let resp_rx = resp_rx.map(|r| r.into_stream_flag()); - - let resp_rx = resp_rx.flatten_stream(); - - Response::from_stream(resp_rx) + Box::new(resp_rx.flatten()) } } @@ -398,7 +425,7 @@ impl ControllerState if let Err(start) = self.conn.start_request_with_resp_sender(start) { let err = error::Error::Other("client died and reconnect failed"); // ignore error - if let Err(_) = start.resp_tx.send(Response::err(err)) { + if let Err(_) = start.resp_tx.send(Err(err)) { debug!("called likely died"); } } diff --git a/src/common/conn.rs b/src/common/conn.rs index 7374a064..54fa77e4 100644 --- a/src/common/conn.rs +++ b/src/common/conn.rs @@ -20,7 +20,6 @@ use solicit::DEFAULT_SETTINGS; use super::closed_streams::*; use super::conf::*; -use super::pump_stream_to_write_loop::PumpStreamToWrite; use super::stream::*; use super::stream_from_network::StreamFromNetwork; use super::stream_map::*; @@ -39,7 +38,6 @@ use common::conn_read::ConnReadSideCustom; use common::conn_write::ConnWriteSideCustom; use common::init_where::InitWhere; use common::iteration_exit::IterationExit; -use data_or_headers_with_flag::DataOrHeadersWithFlagStream; use futures::future; use futures::sync::oneshot; use futures::task; @@ -439,33 +437,6 @@ where goaway && no_streams } - pub fn new_pump_stream_to_write_loop( - &self, - stream_id: StreamId, - stream: DataOrHeadersWithFlagStream, - out_window: window_size::StreamOutWindowReceiver, - ) -> PumpStreamToWrite { - let stream = stream.catch_unwind(); - PumpStreamToWrite { - to_write_tx: self.to_write_tx.clone(), - stream_id: stream_id, - out_window: out_window, - stream: stream, - } - } - - pub fn pump_stream_to_write_loop( - &self, - stream_id: StreamId, - stream: DataOrHeadersWithFlagStream, - out_window: window_size::StreamOutWindowReceiver, - ) { - let stream = stream.catch_unwind(); - self.exec.execute(Box::new( - self.new_pump_stream_to_write_loop(stream_id, stream, out_window), - )); - } - pub fn increase_in_window(&mut self, stream_id: StreamId, increase: u32) -> result::Result<()> { if let Some(mut stream) = self.streams.get_mut(stream_id) { if let Err(_) = stream.stream().in_window_size.try_increase(increase) { diff --git a/src/common/mod.rs b/src/common/mod.rs index 40d3d93b..021052c9 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -6,7 +6,7 @@ mod closed_streams; mod conf; mod conn; mod conn_read; -mod conn_write; +pub mod conn_write; mod hash_set_shallow_clone; pub mod init_where; mod iteration_exit; @@ -18,7 +18,7 @@ mod stream_queue; pub mod stream_queue_sync; mod types; pub mod waiters; -mod window_size; +pub mod window_size; pub use self::closed_streams::*; pub use self::conf::*; diff --git a/src/common/window_size.rs b/src/common/window_size.rs index d2806d49..070a67c8 100644 --- a/src/common/window_size.rs +++ b/src/common/window_size.rs @@ -131,6 +131,7 @@ impl StreamOutWindowSender { struct ConnDead; +#[derive(Eq, PartialEq, Debug)] pub enum StreamDead { Stream, Conn, diff --git a/src/data_or_trailers.rs b/src/data_or_trailers.rs index 25375392..2fd0a77e 100644 --- a/src/data_or_trailers.rs +++ b/src/data_or_trailers.rs @@ -121,7 +121,8 @@ impl HttpStreamAfterHeaders { self.0.map(DataOrTrailers::into_part) } - pub(crate) fn into_part_stream(self) -> DataOrHeadersWithFlagStream { + // TODO: drop + pub(crate) fn _into_part_stream(self) -> DataOrHeadersWithFlagStream { DataOrHeadersWithFlagStream::new(self.into_flag_stream()) } diff --git a/src/lib.rs b/src/lib.rs index d222b425..1eafad48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,9 +77,11 @@ pub use service_paths::ServicePaths; pub use exec::CpuPoolOption; pub use client::client_conf::ClientConf; +pub use client::client_sender::ClientSender; pub use client::client_tls::ClientTlsOption; pub use client::Client; pub use client::ClientBuilder; +pub use common::window_size::StreamDead; pub use server::server_conf::ServerAlpn; pub use server::server_conf::ServerConf; diff --git a/src/service.rs b/src/service.rs index 4d7dbbe2..27f66a8a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -4,11 +4,7 @@ use solicit::header::Headers; /// Central HTTP/2 service interface. /// -/// This trait is used by both client and server. -/// -/// Client API simply implements this trait. -/// -/// Server implementation calls implementation of this trait providede by user. +/// This trait should be implemented by server. pub trait Service: Send + Sync + 'static { /// Start HTTP/2 request. ///