Skip to content

Commit

Permalink
Docs and sample update
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Jun 20, 2024
1 parent 3e961d8 commit 17860f1
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ and want to avoid huge memory allocation.
Cargo.toml:
```toml
[dependencies]
axum-streams = { version = "0.14", features=["json", "csv", "protobuf", "text"] }
axum-streams = { version = "0.15", features=["json", "csv", "protobuf", "text"] }
```

## Compatibility matrix
Expand Down
4 changes: 3 additions & 1 deletion examples/text-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ fn source_test_stream() -> impl Stream<Item = String> {
}

async fn test_text_stream() -> impl IntoResponse {
StreamBodyAs::text(source_test_stream())
StreamBodyAsOptions::new()
.content_type(HttpHeaderValue::from_static("text/plain; charset=utf-8"))
.text(source_test_stream());
}

#[tokio::main]
Expand Down
8 changes: 5 additions & 3 deletions src/arrow_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl StreamingFormat<RecordBatch> for ArrowRecordBatchIpcStreamFormat {
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, RecordBatch>,
_: &'a StreamBodyAsOptions
_: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
fn write_batch(
ipc_data_gen: &mut IpcDataGenerator,
Expand Down Expand Up @@ -111,11 +111,13 @@ impl StreamingFormat<RecordBatch> for ArrowRecordBatchIpcStreamFormat {
Box::pin(batch_stream.chain(append_stream))
}

fn http_response_trailers(&self,_: &StreamBodyAsOptions) -> Option<HeaderMap> {
fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream"),
options.content_type.clone().unwrap_or_else(|| {
http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream")
}),
);
Some(header_map)
}
Expand Down
9 changes: 6 additions & 3 deletions src/csv_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, T>,
_: &'a StreamBodyAsOptions
_: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
let stream_with_header = self.has_headers;
let stream_delimiter = self.delimiter;
Expand Down Expand Up @@ -132,11 +132,14 @@ where
})
}

fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option<HeaderMap> {
fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("text/csv"),
options
.content_type
.clone()
.unwrap_or_else(|| http::header::HeaderValue::from_static("text/csv")),
);
Some(header_map)
}
Expand Down
11 changes: 7 additions & 4 deletions src/json_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, T>,
_: &'a StreamBodyAsOptions
_: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
let stream_bytes: BoxStream<Result<axum::body::Bytes, axum::Error>> = Box::pin({
stream.enumerate().map(|(index, obj)| {
Expand Down Expand Up @@ -116,11 +116,14 @@ where
Box::pin(prepend_stream.chain(stream_bytes.chain(append_stream)))
}

fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option<HeaderMap> {
fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("application/json"),
options
.content_type
.clone()
.unwrap_or_else(|| http::header::HeaderValue::from_static("application/json")),
);
Some(header_map)
}
Expand All @@ -141,7 +144,7 @@ where
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, T>,
_: &'a StreamBodyAsOptions
_: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
Box::pin({
stream.map(|obj| {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod stream_format;
pub use stream_format::*;

mod stream_body_as;
pub use self::stream_body_as::HttpHeaderValue;
pub use self::stream_body_as::StreamBodyAs;
pub use self::stream_body_as::StreamBodyAsOptions;

Expand Down
8 changes: 5 additions & 3 deletions src/protobuf_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ where
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, T>,
_: &'a StreamBodyAsOptions
_: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
fn write_protobuf_record<T>(obj: T) -> Result<Vec<u8>, axum::Error>
where
Expand All @@ -44,11 +44,13 @@ where
})
}

fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option<HeaderMap> {
fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("application/x-protobuf-stream"),
options.content_type.clone().unwrap_or_else(|| {
http::header::HeaderValue::from_static("application/x-protobuf-stream")
}),
);
Some(header_map)
}
Expand Down
20 changes: 13 additions & 7 deletions src/stream_body_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ impl<'a> StreamBodyAs<'a> {
})
.boxed(),
(_, Some(buffering_bytes)) => {
let bytes_stream =
stream_format
.to_bytes_stream(Box::pin(stream), options)
.chain(futures::stream::once(futures::future::ready(Ok(
bytes::Bytes::new(),
))));
let bytes_stream = stream_format
.to_bytes_stream(Box::pin(stream), options)
.chain(futures::stream::once(futures::future::ready(Ok(
bytes::Bytes::new(),
))));

bytes_stream
.scan(
Expand Down Expand Up @@ -147,10 +146,12 @@ impl<'a> HttpBody for StreamBodyAs<'a> {
}
}

pub type HttpHeaderValue = http::header::HeaderValue;

pub struct StreamBodyAsOptions {
pub buffering_ready_items: Option<usize>,
pub buffering_bytes: Option<usize>,
pub content_type: Option<HeaderValue>,
pub content_type: Option<HttpHeaderValue>,
}

impl StreamBodyAsOptions {
Expand All @@ -171,6 +172,11 @@ impl StreamBodyAsOptions {
self.buffering_bytes = Some(ready_bytes);
self
}

pub fn content_type(mut self, content_type: HttpHeaderValue) -> Self {
self.content_type = Some(content_type);
self
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/stream_format.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::StreamBodyAsOptions;
use futures::stream::BoxStream;
use http::HeaderMap;
use crate::StreamBodyAsOptions;

pub trait StreamingFormat<T> {
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, T>,
options: &'a StreamBodyAsOptions
options: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>>;

fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap>;
Expand Down
8 changes: 5 additions & 3 deletions src/text_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl StreamingFormat<String> for TextStreamFormat {
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, String>,
_: &'a StreamBodyAsOptions
_: &'a StreamBodyAsOptions,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
fn write_text_record(obj: String) -> Result<Vec<u8>, axum::Error> {
let obj_vec = obj.as_bytes().to_vec();
Expand All @@ -28,11 +28,13 @@ impl StreamingFormat<String> for TextStreamFormat {
Box::pin(stream.map(move |obj| write_text_record(obj).map(|data| data.into())))
}

fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option<HeaderMap> {
fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("text/plain; charset=utf-8"),
options.content_type.clone().unwrap_or_else(|| {
http::header::HeaderValue::from_static("text/plain; charset=utf-8")
}),
);
Some(header_map)
}
Expand Down

0 comments on commit 17860f1

Please sign in to comment.