From 7eb9f6997653e6fc6772a6d886a50db9196a0f27 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 8 Dec 2023 08:55:33 +0100 Subject: [PATCH] Hyper 1/Axum 0.7 support (#33) * Support for Hyper v1/Axum 0.7 * Working text streamer * CSV stream fix * JSON support * Protobuf support * Fixed tests * Updated pipelines --- .github/workflows/tests.yml | 7 +++++-- Cargo.toml | 9 +++++---- README.md | 3 ++- examples/csv-example.rs | 17 ++++++++--------- examples/json-example.rs | 9 +++------ examples/protobuf-example.rs | 10 +++------- examples/text-example.rs | 9 +++------ src/csv_format.rs | 8 +++++--- src/json_formats.rs | 25 +++++++++++++------------ src/protobuf_format.rs | 11 +++++++---- src/stream_body_as.rs | 18 ++++++------------ src/stream_format.rs | 2 +- src/test_client.rs | 34 ++++++++++------------------------ src/text_format.rs | 12 +++++------- 14 files changed, 76 insertions(+), 98 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 112d4b1..2492f32 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,12 +3,15 @@ on: workflow_dispatch: push: pull_request: - types: [opened] + types: [opened] +concurrency: + group: ${{ github.workflow }}-${{ github.ref_protected && github.run_id || github.event.pull_request.number || github.ref }} + cancel-in-progress: true jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4 + - uses: actions/checkout@v4 - uses: actions-rs/toolchain@v1 with: profile: minimal diff --git a/Cargo.toml b/Cargo.toml index 17e94a9..af3f178 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,11 @@ name = "axum_streams" path = "src/lib.rs" [dependencies] -axum = { version = "0.6" } +axum = { version = "0.7" } bytes = "1" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } -http = "0.2" +http = "1" +http-body = "1" mime = "0.3" tokio = "1" serde = { version = "1", features = ["serde_derive"], optional = true } @@ -44,10 +45,10 @@ text = [] [dev-dependencies] futures = "0.3" -hyper = "0.14" +hyper = "1" reqwest = { version = "0.11", default-features = false, features = ["json", "stream", "multipart"] } tower = { version = "0.4", default-features = false, features = ["util", "make"] } -tower-http = { version = "0.4", features = ["util", "map-response-body"] } +tower-http = { version = "0.5", features = ["util", "map-response-body"] } tower-layer = "0.3" tower-service = "0.3" tokio = { version = "1", features = ["full"] } diff --git a/README.md b/README.md index fab6c4d..930aa91 100644 --- a/README.md +++ b/README.md @@ -19,13 +19,14 @@ and want to avoid huge memory allocation. Cargo.toml: ```toml [dependencies] -axum-streams = { version = "0.10", features=["json", "csv", "protobuf", "text"] } +axum-streams = { version = "0.11", features=["json", "csv", "protobuf", "text"] } ``` ## Compatibility matrix | axum | axum-streams | |------|--------------| +| 0.7 | v0.11 | | 0.6 | v0.9-v0.10 | | 0.5 | 0.7 | diff --git a/examples/csv-example.rs b/examples/csv-example.rs index 72a309c..e5b74ce 100644 --- a/examples/csv-example.rs +++ b/examples/csv-example.rs @@ -1,28 +1,30 @@ use axum::response::IntoResponse; use axum::routing::*; use axum::Router; -use std::net::SocketAddr; use futures::prelude::*; use serde::{Deserialize, Serialize}; +use tokio::net::TcpListener; use tokio_stream::StreamExt; use axum_streams::*; #[derive(Debug, Clone, Deserialize, Serialize)] struct MyTestStructure { - some_test_field: String, + some_test_field1: String, + some_test_field2: String, } fn source_test_stream() -> impl Stream { // Simulating a stream with a plain vector and throttling to show how it works stream::iter(vec![ MyTestStructure { - some_test_field: "test1".to_string() + some_test_field1: "test1".to_string(), + some_test_field2: "test2".to_string() }; 1000 ]) - .throttle(std::time::Duration::from_millis(50)) + .throttle(std::time::Duration::from_millis(500)) } async fn test_csv_stream() -> impl IntoResponse { @@ -45,10 +47,7 @@ async fn main() { // `GET /` goes to `root` .route("/csv-stream", get(test_csv_stream)); - let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); + axum::serve(listener, app).await.unwrap(); } diff --git a/examples/json-example.rs b/examples/json-example.rs index 6f0d398..66d1be0 100644 --- a/examples/json-example.rs +++ b/examples/json-example.rs @@ -1,10 +1,10 @@ use axum::response::IntoResponse; use axum::routing::*; use axum::Router; -use std::net::SocketAddr; use futures::prelude::*; use serde::{Deserialize, Serialize}; +use tokio::net::TcpListener; use tokio_stream::StreamExt; use axum_streams::*; @@ -41,10 +41,7 @@ async fn main() { .route("/json-array-stream", get(test_json_array_stream)) .route("/json-nl-stream", get(test_json_nl_stream)); - let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); + axum::serve(listener, app).await.unwrap(); } diff --git a/examples/protobuf-example.rs b/examples/protobuf-example.rs index 0a00cb7..727ba1d 100644 --- a/examples/protobuf-example.rs +++ b/examples/protobuf-example.rs @@ -1,9 +1,9 @@ use axum::response::IntoResponse; use axum::routing::*; use axum::Router; -use std::net::SocketAddr; use futures::prelude::*; +use tokio::net::TcpListener; use tokio_stream::StreamExt; use axum_streams::*; @@ -36,10 +36,6 @@ async fn main() { // `GET /` goes to `root` .route("/protobuf-stream", get(test_protobuf_stream)); - let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); - - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); + let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + axum::serve(listener, app).await.unwrap(); } diff --git a/examples/text-example.rs b/examples/text-example.rs index e6766d1..52397af 100644 --- a/examples/text-example.rs +++ b/examples/text-example.rs @@ -1,9 +1,9 @@ use axum::response::IntoResponse; use axum::routing::*; use axum::Router; -use std::net::SocketAddr; use futures::prelude::*; +use tokio::net::TcpListener; use tokio_stream::StreamExt; use axum_streams::*; @@ -28,10 +28,7 @@ async fn main() { // `GET /` goes to `root` .route("/text-stream", get(test_text_stream)); - let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); + axum::serve(listener, app).await.unwrap(); } diff --git a/src/csv_format.rs b/src/csv_format.rs index 2de9328..b2a2b60 100644 --- a/src/csv_format.rs +++ b/src/csv_format.rs @@ -3,6 +3,7 @@ use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; +use http_body::Frame; use serde::Serialize; pub struct CsvStreamFormat { @@ -96,7 +97,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - ) -> BoxStream<'b, Result> { + ) -> BoxStream<'b, Result, axum::Error>> { let stream_with_header = self.has_headers; let stream_delimiter = self.delimiter; let stream_flexible = self.flexible; @@ -106,7 +107,7 @@ where let stream_escape = self.escape; let terminator = self.terminator; - let stream_bytes: BoxStream> = Box::pin({ + let stream_bytes: BoxStream, axum::Error>> = Box::pin({ stream.enumerate().map(move |(index, obj)| { let mut writer = csv::WriterBuilder::new() .has_headers(index == 0 && stream_with_header) @@ -125,6 +126,7 @@ where .into_inner() .map_err(axum::Error::new) .map(axum::body::Bytes::from) + .map(Frame::data) }) }); @@ -188,7 +190,7 @@ mod tests { }), ); - let client = TestClient::new(app); + let client = TestClient::new(app).await; let expected_csv = test_stream_vec .iter() diff --git a/src/json_formats.rs b/src/json_formats.rs index 452d1c1..eb91eee 100644 --- a/src/json_formats.rs +++ b/src/json_formats.rs @@ -4,6 +4,7 @@ use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; +use http_body::Frame; use serde::Serialize; use std::io::Write; @@ -22,8 +23,8 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - ) -> BoxStream<'b, Result> { - let stream_bytes: BoxStream> = Box::pin({ + ) -> BoxStream<'b, Result, axum::Error>> { + let stream_bytes: BoxStream, axum::Error>> = Box::pin({ stream.enumerate().map(|(index, obj)| { let mut buf = BytesMut::new().writer(); @@ -37,7 +38,7 @@ where match sep_write_res { Ok(_) => { match serde_json::to_writer(&mut buf, &obj).map_err(axum::Error::new) { - Ok(_) => Ok(buf.into_inner().freeze()), + Ok(_) => Ok(Frame::data(buf.into_inner().freeze())), Err(e) => Err(e), } } @@ -46,14 +47,14 @@ where }) }); - let prepend_stream: BoxStream> = + let prepend_stream: BoxStream, axum::Error>> = Box::pin(futures_util::stream::once(futures_util::future::ready( - Ok::<_, axum::Error>(axum::body::Bytes::from(JSON_ARRAY_BEGIN_BYTES)), + Ok::<_, axum::Error>(Frame::data(axum::body::Bytes::from(JSON_ARRAY_BEGIN_BYTES))), ))); - let append_stream: BoxStream> = + let append_stream: BoxStream, axum::Error>> = Box::pin(futures_util::stream::once(futures_util::future::ready( - Ok::<_, axum::Error>(axum::body::Bytes::from(JSON_ARRAY_END_BYTES)), + Ok::<_, axum::Error>(Frame::data(axum::body::Bytes::from(JSON_ARRAY_END_BYTES))), ))); Box::pin(prepend_stream.chain(stream_bytes.chain(append_stream))) @@ -84,13 +85,13 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - ) -> BoxStream<'b, Result> { - let stream_bytes: BoxStream> = Box::pin({ + ) -> BoxStream<'b, Result, axum::Error>> { + let stream_bytes: BoxStream, axum::Error>> = Box::pin({ stream.map(|obj| { let mut buf = BytesMut::new().writer(); match serde_json::to_writer(&mut buf, &obj).map_err(axum::Error::new) { Ok(_) => match buf.write_all(JSON_NL_SEP_BYTES).map_err(axum::Error::new) { - Ok(_) => Ok(buf.into_inner().freeze()), + Ok(_) => Ok(Frame::data(buf.into_inner().freeze())), Err(e) => Err(e), }, Err(e) => Err(e), @@ -164,7 +165,7 @@ mod tests { get(|| async { StreamBodyAs::new(JsonArrayStreamFormat::new(), test_stream) }), ); - let client = TestClient::new(app); + let client = TestClient::new(app).await; let expected_json = serde_json::to_string(&test_stream_vec).unwrap(); let res = client.get("/").send().await.unwrap(); @@ -201,7 +202,7 @@ mod tests { get(|| async { StreamBodyAs::new(JsonNewLineStreamFormat::new(), test_stream) }), ); - let client = TestClient::new(app); + let client = TestClient::new(app).await; let expected_json = test_stream_vec .iter() diff --git a/src/protobuf_format.rs b/src/protobuf_format.rs index c354927..631ae54 100644 --- a/src/protobuf_format.rs +++ b/src/protobuf_format.rs @@ -3,6 +3,7 @@ use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; +use http_body::Frame; pub struct ProtobufStreamFormat; @@ -19,7 +20,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - ) -> BoxStream<'b, Result> { + ) -> BoxStream<'b, Result, axum::Error>> { fn write_protobuf_record(obj: T) -> Result, axum::Error> where T: prost::Message, @@ -33,10 +34,12 @@ where Ok(frame_vec) } - let stream_bytes: BoxStream> = Box::pin({ + let stream_bytes: BoxStream, axum::Error>> = Box::pin({ stream.map(move |obj| { let write_protobuf_res = write_protobuf_record(obj); - write_protobuf_res.map(axum::body::Bytes::from) + write_protobuf_res + .map(axum::body::Bytes::from) + .map(Frame::data) }) }); @@ -97,7 +100,7 @@ mod tests { get(|| async { StreamBodyAs::new(ProtobufStreamFormat::new(), test_stream) }), ); - let client = TestClient::new(app); + let client = TestClient::new(app).await; let expected_proto_buf: Vec = test_stream_vec .iter() diff --git a/src/stream_body_as.rs b/src/stream_body_as.rs index a8632e7..0e51405 100644 --- a/src/stream_body_as.rs +++ b/src/stream_body_as.rs @@ -1,15 +1,16 @@ use crate::stream_format::StreamingFormat; -use axum::body::HttpBody; +use axum::body::{Body, HttpBody}; use axum::response::{IntoResponse, Response}; use futures::Stream; use futures_util::stream::BoxStream; use http::HeaderMap; +use http_body::Frame; use std::fmt::Formatter; use std::pin::Pin; use std::task::{Context, Poll}; pub struct StreamBodyAs<'a> { - stream: BoxStream<'a, Result>, + stream: BoxStream<'a, Result, axum::Error>>, trailers: Option, } @@ -46,7 +47,7 @@ impl IntoResponse for StreamBodyAs<'static> { HeaderMap::new() }; - let mut response = Response::new(axum::body::boxed(self)); + let mut response: Response = Response::new(Body::new(self)); *response.headers_mut() = headers; response } @@ -56,17 +57,10 @@ impl<'a> HttpBody for StreamBodyAs<'a> { type Data = axum::body::Bytes; type Error = axum::Error; - fn poll_data( + fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { Pin::new(&mut self.stream).poll_next(cx) } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(self.trailers.clone())) - } } diff --git a/src/stream_format.rs b/src/stream_format.rs index 4e1cbf0..f34d857 100644 --- a/src/stream_format.rs +++ b/src/stream_format.rs @@ -5,7 +5,7 @@ pub trait StreamingFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - ) -> BoxStream<'b, Result>; + ) -> BoxStream<'b, Result, axum::Error>>; fn http_response_trailers(&self) -> Option; } diff --git a/src/test_client.rs b/src/test_client.rs index 8b4c3f9..f2559ec 100644 --- a/src/test_client.rs +++ b/src/test_client.rs @@ -1,12 +1,9 @@ -use axum::body::HttpBody; -use http::Request; - +use axum::Router; use reqwest::RequestBuilder; -use std::net::{SocketAddr, TcpListener}; -use tower::make::Shared; -use tower_service::Service; +use std::net::SocketAddr; +use tokio::net::TcpListener; -// This class is a copy from Axum project (https://github.com/tokio-rs/axum), since +// This class was originally a copy from Axum project (https://github.com/tokio-rs/axum), since // this not available for external crates to use in tests pub(crate) struct TestClient { client: reqwest::Client, @@ -14,26 +11,15 @@ pub(crate) struct TestClient { } impl TestClient { - pub(crate) fn new(svc: S) -> Self - where - S: Service, Response = http::Response> - + Clone - + Send - + 'static, - ResBody: HttpBody + Send + 'static, - ResBody::Data: Send, - ResBody::Error: Into, - S::Future: Send, - S::Error: Into, - { - let listener = TcpListener::bind("127.0.0.1:0").expect("Could not bind ephemeral socket"); - let addr = listener.local_addr().unwrap(); + pub(crate) async fn new(router: Router) -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("Could not bind ephemeral socket"); + let addr = listener.local_addr().unwrap().clone(); println!("Listening on {}", addr); tokio::spawn(async move { - let server = hyper::server::Server::from_tcp(listener) - .unwrap() - .serve(Shared::new(svc)); + let server = axum::serve(listener, router); server.await.expect("server error"); }); diff --git a/src/text_format.rs b/src/text_format.rs index 72ac8af..06b7d3b 100644 --- a/src/text_format.rs +++ b/src/text_format.rs @@ -3,6 +3,7 @@ use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; +use http_body::Frame; pub struct TextStreamFormat; @@ -16,17 +17,14 @@ impl StreamingFormat for TextStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, String>, - ) -> BoxStream<'b, Result> { + ) -> BoxStream<'b, Result, axum::Error>> { fn write_text_record(obj: String) -> Result, axum::Error> { let obj_vec = obj.as_bytes().to_vec(); Ok(obj_vec) } - let stream_bytes: BoxStream> = Box::pin({ - stream.map(move |obj| { - let write_text_res = write_text_record(obj); - write_text_res.map(axum::body::Bytes::from) - }) + let stream_bytes: BoxStream, axum::Error>> = Box::pin({ + stream.map(move |obj| write_text_record(obj).map(|data| Frame::data(data.into()))) }); Box::pin(stream_bytes) @@ -88,7 +86,7 @@ mod tests { get(|| async { StreamBodyAs::new(TextStreamFormat::new(), test_stream) }), ); - let client = TestClient::new(app); + let client = TestClient::new(app).await; let expected_text_buf: Vec = test_stream_vec .iter()