From 17860f16cfbb460196f272594852cc580a7b95a5 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Thu, 20 Jun 2024 16:13:57 +0200 Subject: [PATCH] Docs and sample update --- README.md | 2 +- examples/text-example.rs | 4 +++- src/arrow_format.rs | 8 +++++--- src/csv_format.rs | 9 ++++++--- src/json_formats.rs | 11 +++++++---- src/lib.rs | 1 + src/protobuf_format.rs | 8 +++++--- src/stream_body_as.rs | 20 +++++++++++++------- src/stream_format.rs | 4 ++-- src/text_format.rs | 8 +++++--- 10 files changed, 48 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index d15060b..63dc2a3 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ and want to avoid huge memory allocation. Cargo.toml: ```toml [dependencies] -axum-streams = { version = "0.14", features=["json", "csv", "protobuf", "text"] } +axum-streams = { version = "0.15", features=["json", "csv", "protobuf", "text"] } ``` ## Compatibility matrix diff --git a/examples/text-example.rs b/examples/text-example.rs index 52397af..bfe4ea8 100644 --- a/examples/text-example.rs +++ b/examples/text-example.rs @@ -18,7 +18,9 @@ fn source_test_stream() -> impl Stream { } async fn test_text_stream() -> impl IntoResponse { - StreamBodyAs::text(source_test_stream()) + StreamBodyAsOptions::new() + .content_type(HttpHeaderValue::from_static("text/plain; charset=utf-8")) + .text(source_test_stream()); } #[tokio::main] diff --git a/src/arrow_format.rs b/src/arrow_format.rs index fe04672..7395f2d 100644 --- a/src/arrow_format.rs +++ b/src/arrow_format.rs @@ -34,7 +34,7 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, RecordBatch>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_batch( ipc_data_gen: &mut IpcDataGenerator, @@ -111,11 +111,13 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { Box::pin(batch_stream.chain(append_stream)) } - fn http_response_trailers(&self,_: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream") + }), ); Some(header_map) } diff --git a/src/csv_format.rs b/src/csv_format.rs index 87796f4..1161758 100644 --- a/src/csv_format.rs +++ b/src/csv_format.rs @@ -98,7 +98,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { let stream_with_header = self.has_headers; let stream_delimiter = self.delimiter; @@ -132,11 +132,14 @@ where }) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("text/csv"), + options + .content_type + .clone() + .unwrap_or_else(|| http::header::HeaderValue::from_static("text/csv")), ); Some(header_map) } diff --git a/src/json_formats.rs b/src/json_formats.rs index 61b4750..6e818cf 100644 --- a/src/json_formats.rs +++ b/src/json_formats.rs @@ -42,7 +42,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { let stream_bytes: BoxStream> = Box::pin({ stream.enumerate().map(|(index, obj)| { @@ -116,11 +116,14 @@ where Box::pin(prepend_stream.chain(stream_bytes.chain(append_stream))) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/json"), + options + .content_type + .clone() + .unwrap_or_else(|| http::header::HeaderValue::from_static("application/json")), ); Some(header_map) } @@ -141,7 +144,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { Box::pin({ stream.map(|obj| { diff --git a/src/lib.rs b/src/lib.rs index 9bb9072..84937f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ mod stream_format; pub use stream_format::*; mod stream_body_as; +pub use self::stream_body_as::HttpHeaderValue; pub use self::stream_body_as::StreamBodyAs; pub use self::stream_body_as::StreamBodyAsOptions; diff --git a/src/protobuf_format.rs b/src/protobuf_format.rs index 5b906cc..5889f7b 100644 --- a/src/protobuf_format.rs +++ b/src/protobuf_format.rs @@ -21,7 +21,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_protobuf_record(obj: T) -> Result, axum::Error> where @@ -44,11 +44,13 @@ where }) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/x-protobuf-stream"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("application/x-protobuf-stream") + }), ); Some(header_map) } diff --git a/src/stream_body_as.rs b/src/stream_body_as.rs index 7a75457..3373aa5 100644 --- a/src/stream_body_as.rs +++ b/src/stream_body_as.rs @@ -84,12 +84,11 @@ impl<'a> StreamBodyAs<'a> { }) .boxed(), (_, Some(buffering_bytes)) => { - let bytes_stream = - stream_format - .to_bytes_stream(Box::pin(stream), options) - .chain(futures::stream::once(futures::future::ready(Ok( - bytes::Bytes::new(), - )))); + let bytes_stream = stream_format + .to_bytes_stream(Box::pin(stream), options) + .chain(futures::stream::once(futures::future::ready(Ok( + bytes::Bytes::new(), + )))); bytes_stream .scan( @@ -147,10 +146,12 @@ impl<'a> HttpBody for StreamBodyAs<'a> { } } +pub type HttpHeaderValue = http::header::HeaderValue; + pub struct StreamBodyAsOptions { pub buffering_ready_items: Option, pub buffering_bytes: Option, - pub content_type: Option, + pub content_type: Option, } impl StreamBodyAsOptions { @@ -171,6 +172,11 @@ impl StreamBodyAsOptions { self.buffering_bytes = Some(ready_bytes); self } + + pub fn content_type(mut self, content_type: HttpHeaderValue) -> Self { + self.content_type = Some(content_type); + self + } } #[cfg(test)] diff --git a/src/stream_format.rs b/src/stream_format.rs index 59be9fd..beb8c33 100644 --- a/src/stream_format.rs +++ b/src/stream_format.rs @@ -1,12 +1,12 @@ +use crate::StreamBodyAsOptions; use futures::stream::BoxStream; use http::HeaderMap; -use crate::StreamBodyAsOptions; pub trait StreamingFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - options: &'a StreamBodyAsOptions + options: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result>; fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option; diff --git a/src/text_format.rs b/src/text_format.rs index 13ecdd1..c519af6 100644 --- a/src/text_format.rs +++ b/src/text_format.rs @@ -18,7 +18,7 @@ impl StreamingFormat for TextStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, String>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_text_record(obj: String) -> Result, axum::Error> { let obj_vec = obj.as_bytes().to_vec(); @@ -28,11 +28,13 @@ impl StreamingFormat for TextStreamFormat { Box::pin(stream.map(move |obj| write_text_record(obj).map(|data| data.into()))) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("text/plain; charset=utf-8"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("text/plain; charset=utf-8") + }), ); Some(header_map) }