diff --git a/src/stream_body_as.rs b/src/stream_body_as.rs index 97b77ba..6cdba16 100644 --- a/src/stream_body_as.rs +++ b/src/stream_body_as.rs @@ -71,8 +71,8 @@ impl<'a> StreamBodyAs<'a> { FMT: StreamingFormat, S: Stream + 'a + Send, { - match options.buffering_ready_items { - Some(buffering_ready_items) => stream_format + match (options.buffering_ready_items, options.buffering_bytes) { + (Some(buffering_ready_items), _) => stream_format .to_bytes_stream(Box::pin(stream)) .ready_chunks(buffering_ready_items) .map(|chunks| { @@ -83,7 +83,40 @@ impl<'a> StreamBodyAs<'a> { Ok(Frame::data(buf.freeze())) }) .boxed(), - None => stream_format + (_, Some(buffering_bytes)) => { + let bytes_stream = + stream_format + .to_bytes_stream(Box::pin(stream)) + .chain(futures::stream::once(futures::future::ready(Ok( + bytes::Bytes::new(), + )))); + + bytes_stream + .scan( + BytesMut::with_capacity(buffering_bytes), + move |current_buffer, maybe_bytes| { + futures::future::ready(match maybe_bytes { + Ok(bytes) if bytes.is_empty() => { + Some(vec![Ok(Frame::data(current_buffer.split().freeze()))]) + } + Ok(bytes) => { + let mut frames = Vec::new(); + current_buffer.extend_from_slice(&bytes); + while current_buffer.len() >= buffering_bytes { + let buffer = + current_buffer.split_to(buffering_bytes).freeze(); + frames.push(Ok(Frame::data(buffer))); + } + Some(frames) + } + Err(_) => None, + }) + }, + ) + .flat_map(|res| futures::stream::iter(res).boxed()) + .boxed() + } + (None, None) => stream_format .to_bytes_stream(Box::pin(stream)) .map(|res| res.map(Frame::data)) .boxed(), @@ -116,12 +149,14 @@ impl<'a> HttpBody for StreamBodyAs<'a> { pub struct StreamBodyAsOptions { pub buffering_ready_items: Option, + pub buffering_bytes: Option, } impl StreamBodyAsOptions { pub fn new() -> Self { Self { buffering_ready_items: None, + buffering_bytes: None, } } @@ -129,6 +164,11 @@ impl StreamBodyAsOptions { self.buffering_ready_items = Some(ready_items); self } + + pub fn buffering_bytes(mut self, ready_bytes: usize) -> Self { + self.buffering_bytes = Some(ready_bytes); + self + } } #[cfg(test)] @@ -164,7 +204,7 @@ mod tests { } #[tokio::test] - async fn test_stream_body_as_buffering_item() { + async fn test_stream_body_as_buffering_items() { let stream = futures::stream::iter(vec![ "First".to_string(), "Second".to_string(), @@ -187,4 +227,33 @@ mod tests { assert_eq!(data[0], Bytes::from("FirstSecond")); assert_eq!(data[1], Bytes::from("Third")); } + + #[tokio::test] + async fn test_stream_body_as_buffering_bytes() { + let stream = futures::stream::iter(vec![ + "First".to_string(), + "Second".to_string(), + "Third".to_string(), + ]) + .boxed(); + let stream_body_as = StreamBodyAs::with_options( + TextStreamFormat::new(), + stream, + StreamBodyAsOptions::new().buffering_bytes(3), + ); + let response = stream_body_as.into_response(); + assert_eq!( + response.headers().get(http::header::CONTENT_TYPE).unwrap(), + "text/plain; charset=utf-8" + ); + let read = response.into_body().into_data_stream(); + let data: Vec = read.try_collect().await.unwrap(); + assert_eq!(data.len(), 6); + assert_eq!(data[0], Bytes::from("Fir")); + assert_eq!(data[1], Bytes::from("stS")); + assert_eq!(data[2], Bytes::from("eco")); + assert_eq!(data[3], Bytes::from("ndT")); + assert_eq!(data[4], Bytes::from("hir")); + assert_eq!(data[5], Bytes::from("d")); + } }