Skip to content

Commit

Permalink
Working text streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Dec 8, 2023
1 parent 6f3d358 commit be14a1c
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 14 deletions.
9 changes: 3 additions & 6 deletions examples/text-example.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use axum::response::IntoResponse;
use axum::routing::*;
use axum::Router;
use std::net::SocketAddr;

use futures::prelude::*;
use tokio::net::TcpListener;
use tokio_stream::StreamExt;

use axum_streams::*;
Expand All @@ -28,10 +28,7 @@ async fn main() {
// `GET /` goes to `root`
.route("/text-stream", get(test_text_stream));

let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();

axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
}
5 changes: 2 additions & 3 deletions src/stream_body_as.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::stream_format::StreamingFormat;
use axum::body::{Body, HttpBody};
use axum::response::{IntoResponse, Response};
use futures::Stream;
use futures_util::stream::BoxStream;
use http::HeaderMap;
use http_body::Frame;
use std::fmt::Formatter;
use std::pin::Pin;
use std::task::{Context, Poll};
use axum::body::{Body, HttpBody};
use http_body::Frame;

pub struct StreamBodyAs<'a> {
stream: BoxStream<'a, Result<Frame<axum::body::Bytes>, axum::Error>>,
Expand Down Expand Up @@ -63,5 +63,4 @@ impl<'a> HttpBody for StreamBodyAs<'a> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
}

}
7 changes: 2 additions & 5 deletions src/text_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ impl StreamingFormat<String> for TextStreamFormat {
Ok(obj_vec)
}

let stream_bytes: BoxStream<Result<axum::body::Bytes, axum::Error>> = Box::pin({
stream.map(move |obj| {
let write_text_res = write_text_record(obj);
write_text_res.map(axum::body::Bytes::from)
})
let stream_bytes: BoxStream<Result<Frame<axum::body::Bytes>, axum::Error>> = Box::pin({
stream.map(move |obj| write_text_record(obj).map(|data| Frame::data(data.into())))
});

Box::pin(stream_bytes)
Expand Down

0 comments on commit be14a1c

Please sign in to comment.