Skip to content

Commit

Permalink
Buffering window in bytes support (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence authored Apr 14, 2024
1 parent d31c87b commit de9b19a
Showing 1 changed file with 73 additions and 4 deletions.
77 changes: 73 additions & 4 deletions src/stream_body_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl<'a> StreamBodyAs<'a> {
FMT: StreamingFormat<T>,
S: Stream<Item = T> + 'a + Send,
{
match options.buffering_ready_items {
Some(buffering_ready_items) => stream_format
match (options.buffering_ready_items, options.buffering_bytes) {
(Some(buffering_ready_items), _) => stream_format
.to_bytes_stream(Box::pin(stream))
.ready_chunks(buffering_ready_items)
.map(|chunks| {
Expand All @@ -83,7 +83,40 @@ impl<'a> StreamBodyAs<'a> {
Ok(Frame::data(buf.freeze()))
})
.boxed(),
None => stream_format
(_, Some(buffering_bytes)) => {
let bytes_stream =
stream_format
.to_bytes_stream(Box::pin(stream))
.chain(futures::stream::once(futures::future::ready(Ok(
bytes::Bytes::new(),
))));

bytes_stream
.scan(
BytesMut::with_capacity(buffering_bytes),
move |current_buffer, maybe_bytes| {
futures::future::ready(match maybe_bytes {
Ok(bytes) if bytes.is_empty() => {
Some(vec![Ok(Frame::data(current_buffer.split().freeze()))])
}
Ok(bytes) => {
let mut frames = Vec::new();
current_buffer.extend_from_slice(&bytes);
while current_buffer.len() >= buffering_bytes {
let buffer =
current_buffer.split_to(buffering_bytes).freeze();
frames.push(Ok(Frame::data(buffer)));
}
Some(frames)
}
Err(_) => None,
})
},
)
.flat_map(|res| futures::stream::iter(res).boxed())
.boxed()
}
(None, None) => stream_format
.to_bytes_stream(Box::pin(stream))
.map(|res| res.map(Frame::data))
.boxed(),
Expand Down Expand Up @@ -116,19 +149,26 @@ impl<'a> HttpBody for StreamBodyAs<'a> {

pub struct StreamBodyAsOptions {
pub buffering_ready_items: Option<usize>,
pub buffering_bytes: Option<usize>,
}

impl StreamBodyAsOptions {
pub fn new() -> Self {
Self {
buffering_ready_items: None,
buffering_bytes: None,
}
}

pub fn buffering_ready_items(mut self, ready_items: usize) -> Self {
self.buffering_ready_items = Some(ready_items);
self
}

pub fn buffering_bytes(mut self, ready_bytes: usize) -> Self {
self.buffering_bytes = Some(ready_bytes);
self
}
}

#[cfg(test)]
Expand Down Expand Up @@ -164,7 +204,7 @@ mod tests {
}

#[tokio::test]
async fn test_stream_body_as_buffering_item() {
async fn test_stream_body_as_buffering_items() {
let stream = futures::stream::iter(vec![
"First".to_string(),
"Second".to_string(),
Expand All @@ -187,4 +227,33 @@ mod tests {
assert_eq!(data[0], Bytes::from("FirstSecond"));
assert_eq!(data[1], Bytes::from("Third"));
}

#[tokio::test]
async fn test_stream_body_as_buffering_bytes() {
let stream = futures::stream::iter(vec![
"First".to_string(),
"Second".to_string(),
"Third".to_string(),
])
.boxed();
let stream_body_as = StreamBodyAs::with_options(
TextStreamFormat::new(),
stream,
StreamBodyAsOptions::new().buffering_bytes(3),
);
let response = stream_body_as.into_response();
assert_eq!(
response.headers().get(http::header::CONTENT_TYPE).unwrap(),
"text/plain; charset=utf-8"
);
let read = response.into_body().into_data_stream();
let data: Vec<Bytes> = read.try_collect().await.unwrap();
assert_eq!(data.len(), 6);
assert_eq!(data[0], Bytes::from("Fir"));
assert_eq!(data[1], Bytes::from("stS"));
assert_eq!(data[2], Bytes::from("eco"));
assert_eq!(data[3], Bytes::from("ndT"));
assert_eq!(data[4], Bytes::from("hir"));
assert_eq!(data[5], Bytes::from("d"));
}
}

0 comments on commit de9b19a

Please sign in to comment.