From c672214fe59a2134409f02ff83c376892923320e Mon Sep 17 00:00:00 2001 From: ihciah Date: Fri, 26 Jan 2024 09:11:45 +0000 Subject: [PATCH] feat: support poll-io feature to run epoll over iouring --- Cargo.toml | 1 - docs/en/compatiable-with-tokio-eco.md | 50 ++-- docs/zh/compatiable-with-tokio-eco.md | 18 +- examples/Cargo.toml | 14 +- examples/echo_poll.rs | 48 ++++ examples/hyper_client.rs | 149 +++-------- examples/hyper_server.rs | 49 ++-- examples/tokio-io-compat/Cargo.toml | 14 - examples/tokio-io-compat/src/main.rs | 130 --------- monoio-compat/Cargo.toml | 18 +- monoio-compat/src/hyper.rs | 241 +++++++++++++++++ monoio-compat/src/lib.rs | 3 + monoio-macros/Cargo.toml | 2 +- monoio/Cargo.toml | 15 +- monoio/src/blocking.rs | 1 + monoio/src/buf/raw_buf.rs | 6 +- monoio/src/buf/slice.rs | 22 +- monoio/src/driver/legacy/mod.rs | 70 +++-- monoio/src/driver/mod.rs | 39 ++- monoio/src/driver/op.rs | 27 +- monoio/src/driver/op/accept.rs | 13 +- monoio/src/driver/op/close.rs | 13 +- monoio/src/driver/op/connect.rs | 14 +- monoio/src/driver/op/fsync.rs | 21 +- monoio/src/driver/op/open.rs | 13 +- monoio/src/driver/op/poll.rs | 17 +- monoio/src/driver/op/read.rs | 14 +- monoio/src/driver/op/recv.rs | 19 +- monoio/src/driver/op/send.rs | 20 +- monoio/src/driver/op/splice.rs | 3 +- monoio/src/driver/op/write.rs | 14 +- monoio/src/driver/poll.rs | 108 ++++++++ monoio/src/driver/{legacy => }/ready.rs | 3 + .../src/driver/{legacy => }/scheduled_io.rs | 21 +- monoio/src/driver/shared_fd.rs | 250 ++++++++++++++---- monoio/src/driver/thread.rs | 14 +- monoio/src/driver/uring/mod.rs | 117 +++++++- monoio/src/fs/file.rs | 2 +- monoio/src/io/mod.rs | 34 +++ monoio/src/io/util/buf_reader.rs | 8 +- monoio/src/io/util/buf_writer.rs | 2 + monoio/src/io/util/cancel.rs | 2 + monoio/src/io/util/prefixed_io.rs | 5 +- monoio/src/macros/scoped_tls.rs | 1 + monoio/src/net/tcp/listener.rs | 8 +- monoio/src/net/tcp/mod.rs | 3 + monoio/src/net/tcp/stream.rs | 6 +- monoio/src/net/tcp/stream_poll.rs | 176 ++++++++++++ monoio/src/net/udp.rs | 4 +- monoio/src/net/unix/datagram/mod.rs | 4 +- monoio/src/net/unix/listener.rs | 8 +- monoio/src/net/unix/mod.rs | 3 + monoio/src/net/unix/seq_packet/listener.rs | 4 +- monoio/src/net/unix/seq_packet/mod.rs | 6 +- monoio/src/net/unix/stream.rs | 6 +- monoio/src/net/unix/stream_poll.rs | 146 ++++++++++ 56 files changed, 1473 insertions(+), 546 deletions(-) create mode 100644 examples/echo_poll.rs delete mode 100644 examples/tokio-io-compat/Cargo.toml delete mode 100644 examples/tokio-io-compat/src/main.rs create mode 100644 monoio-compat/src/hyper.rs create mode 100644 monoio/src/driver/poll.rs rename monoio/src/driver/{legacy => }/ready.rs (98%) rename monoio/src/driver/{legacy => }/scheduled_io.rs (86%) create mode 100644 monoio/src/net/tcp/stream_poll.rs create mode 100644 monoio/src/net/unix/stream_poll.rs diff --git a/Cargo.toml b/Cargo.toml index e8402126..95d47267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,5 @@ members = [ # Internal "examples", - "examples/tokio-io-compat", ] resolver = "2" diff --git a/docs/en/compatiable-with-tokio-eco.md b/docs/en/compatiable-with-tokio-eco.md index c0140841..20f2cd96 100644 --- a/docs/en/compatiable-with-tokio-eco.md +++ b/docs/en/compatiable-with-tokio-eco.md @@ -1,38 +1,50 @@ --- title: Compatible with Tokio ecology date: 2021-12-17 18:00:00 +updated: 2024-01-30 15:00:00 author: ihciah --- -# Compatible with Tokio ecology -A large number of existing Rust components are compatible with Tokio, and they directly rely on Tokio's `AsyncRead` and `AsyncWrite` interfaces. +# Compatible with the Tokio Ecosystem +A large number of existing Rust components are compatible with Tokio and directly depend on Tokio's `AsyncRead` and `AsyncWrite` traits. -In Monoio, since the bottom layer is an asynchronous system call, we chose a similar tokio-uring approach: providing an IO interface that transfers the ownership of the buffer. But at this stage, obviously there are not many available libraries that can work, so we need to quickly support functions with a certain performance sacrifice. +In Monoio, due to the underlying asynchronous system calls, we chose an approach similar to tokio-uring: providing IO interfaces that transfer buffer ownership. However, at this stage, there are obviously not many libraries available that work with this, so we need some means to be compatible with existing interface components. -## monoio-compat -`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on the buffer ownership interface. +Currently, there are 3 ways to achieve compatibility: + +## tokio-compat +`tokio-compat` is a feature of monoio. When the `iouring` is disabled and the `legacy` feature is enabled, after turning on the `tokio-compat` feature, `TcpStream`/`UnixStream` will implement `tokio::io::{AsyncRead, AsyncWrite}`. + +If you explicitly do not use iouring, then you can provide compatibility in this way. This form of compatibility has no overhead. If you might use iouring, then you should use the `poll-io` feature. -### How it works -For `poll_read`, the remaining capacity of the slice passed by the user will be read first, and then the buffer held by the user will be limited to this capacity and a future will be generated. After that, every time the user `poll_read` again, it will be forwarded to the `poll` method of this future. When returning to `Ready`, the contents of the buffer are copied to the slice passed by the user. +## poll-io +`poll-io` is a feature of monoio. After enabling this feature: +1. `tokio::io` will be re-exported to `monoio::io::poll_io` +2. `TcpStream`/`UnixStream` can be converted to and from `TcpStreamPoll`/`UnixStreamPoll` +3. `TcpStreamPoll`/`UnixStreamPoll` implements `tokio::io::{AsyncRead, AsyncWrite}` -For `poll_write`, the content of the slice passed by the user is copied to its own buffer first, then a future is generated and stored, and Ready is returned immediately. After that, every time the user `poll_write` again, it will first wait for the last content to be sent, then copy the data and return to Ready immediately. It behaves like BufWriter, and may causing errors to be delayed to be perceived. +The underlying implementation of this feature runs epoll to sense fd readiness on top of iouring and directly initiates a syscall. Although this form of compatibility cannot effectively utilize iouring for asynchronous io, its performance is similar to other epoll+syscall implementations without additional copy overhead. + +## monoio-compat +`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on an interface that transfers buffer ownership. -At the cost, using this compatibility layer will cost you an extra buffer copy. +### How It Works +For `poll_read`, it first reads into the remaining capacity of the slice passed by the user, then restricts its own buffer to that capacity and generates a future. Afterwards, every time the user again calls `poll_read`, it will forward to the `poll` method of this future. When returning `Ready`, it copies the contents of the buffer into the user's passed slice. -For `poll_write`, the content of the slice passed by the user is first copied to its own buffer, and then a future is generated and stored. After that, every time the user `poll_write` again, it will be forwarded to the `poll` method of this future. Return the result to the user when returning `Ready`. +For `poll_write`, it first copies the contents of the user's passed slice into its own buffer, then generates a future and stores it, and immediately returns Ready. Thereafter, every time the user again calls `poll_write`, it will first wait for the content of the last write to be fully sent before copying data and immediately returning Ready. This behavior is similar to that of a BufWriter, which can lead to delayed error detection. -In other words, using this compatibility layer will cost you an extra buffer copy overhead. +The cost of using this compatibility layer is an additional buffer copy overhead. -### Usage restrictions -For write operations, users need to manually call poll_flush or poll_shutdown of AsyncWrite at the end (or the corresponding flush or shutdown methods of AsyncWriteExt), otherwise the data may not be submitted to the kernel (continuous writes do not require manual flushing). +### Usage Restrictions +For write operations, users need to manually call AsyncWrite's poll_flush or poll_shutdown (or the corresponding flush or shutdown methods in AsyncWriteExt) at the end; otherwise, data may not be submitted to the kernel (continuous writes do not require manual flushing). -## Poll-oriented interface and asynchronous system call -There are two ways to express asynchrony in Rust, one is based on `poll` and the other is based on `async`. `poll` is synchronous, semantically expressing an immediate attempt; while `async` is essentially the syntactic sugar of poll, it will swallow the future and generate a state machine, which is executed in a loop during await. +## Poll-Based Interfaces and Asynchronous System Calls +There are two ways to express asynchrony in Rust: one is based on `poll`, and the other is based on `async`. `Poll` is synchronous and semantically expresses the trying immediately; while `async` is essentially syntactic sugar for poll which encapsulates the future and generates a state machine, executing this state machine in a loop when awaiting. -In Tokio, there are methods similar to `poll_read` and `poll_write`, both of which express the semantics of synchronous attempts. +In Tokio, there are methods like `poll_read` and `poll_write` that both express the semantic of synchronous trying. -When it returns to `Pending`, it means that IO is not ready (and the waker of cx is registered for notification), and when it returns to `Ready` it means that IO has been completed. It is easy to implement these two interfaces based on synchronous system calls. Directly make the corresponding system calls and determine the return result. If the IO is not ready, it will be suspended to Reactor. +When `Pending` is returned, it implies that the IO is not ready (and registers a notice with the waker in cx), and when `Ready` is returned, it means the IO has completed. It is easy to implement these two interfaces based on synchronous system calls, by directly making the corresponding system call and judging the return result, and if the IO is not ready, suspend to the Reactor. -However, these two interfaces are difficult to implement under asynchronous system calls. If we have pushed an Op into the io_uring SQ, the state of this syscall is uncertain before the corresponding CQE is consumed. We have no way to provide clear completed or unfinished semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities has led to our use restrictions. Under asynchronous system calls, it is more appropriate to pass the ownership of the buffer and cooperate with `async+await`. +However, these two interfaces are difficult to implement under asynchronous system calls. If we have already pushed an Op into the io_uring SQ, then the status of that syscall is uncertain until we consume the corresponding CQE. We cannot provide a clear completed or not completed semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities leads to our usage restrictions. Under asynchronous system calls, transferring buffer ownership in combination with `async+await` is more appropriate. -At present, the Rust standard library does not have a interface for asynchronous system calls, and there is no related component ecology. We are working hard to solve this problem. \ No newline at end of file +Currently, Rust's standard library does not have a universal interface oriented towards asynchronous system calls, and neither does the related component ecosystem. We are working hard to improve this problem. \ No newline at end of file diff --git a/docs/zh/compatiable-with-tokio-eco.md b/docs/zh/compatiable-with-tokio-eco.md index 94712d32..f9f9a765 100644 --- a/docs/zh/compatiable-with-tokio-eco.md +++ b/docs/zh/compatiable-with-tokio-eco.md @@ -1,13 +1,29 @@ --- title: 与 Tokio 生态兼容 date: 2021-12-17 18:00:00 +updated: 2024-01-30 15:00:00 author: ihciah --- # 与 Tokio 生态兼容 现有 Rust 组件中有大量的组件与 Tokio 是兼容的,它们直接依赖了 Tokio 的 `AsyncRead` 和 `AsyncWrite` 接口。 -而在 Monoio,由于底层是异步系统调用,所以我们选择了类似 tokio-uring 的做法:提供传 buffer 所有权的 IO 接口。但现阶段明显没有很多可用的库可以工作,所以我们需要以一定的性能牺牲来快速支持功能。 +而在 Monoio,由于底层是异步系统调用,所以我们选择了类似 tokio-uring 的做法:提供传 buffer 所有权的 IO 接口。但现阶段明显没有很多可用的库可以工作,所以我们需要一些手段来兼容现有接口的组件。 + +当前共有 3 种兼容手段: + +## tokio-compat +`tokio-compat` 是 monoio 的一个 feature。当关闭 `iouring` 且打开 `legacy` feature 时,开启 `tokio-compat` feature 后,`TcpStream`/`UnixStream` 会实现 `tokio::io::{AsyncRead, AsyncWrite}`。 + +如果你明确不使用 iouring,那么你可以通过这种方式提供兼容性。这种形式的兼容性是没有 overhead 的。如果你有可能会使用 iouring,那么你应该使用 `poll-io` feature。 + +## poll-io +`poll-io` 是 monoio 的一个 feature。当开启该 feature 后: +1. `tokio::io` 会被 reexport 到 `monoio::io::poll_io` +2. `TcpStream`/`UnixStream` 可以与 `TcpStreamPoll`/`UnixStreamPoll` 互相转换 +3. `TcpStreamPoll`/`UnixStreamPoll` 实现 `tokio::io::{AsyncRead, AsyncWrite}` + +这个 feature 的底层实现是在 iouring 上运行 epoll 来感知 fd readiness,并直接发起 syscall。这种形式的兼容虽然无法有效利用 iouring 做异步 io,但它的性能与其他基于 epoll+syscall 的实现是类似的,没有额外的拷贝开销。 ## monoio-compat `monoio-compat` 是一个兼容层,它基于 buffer 所有权的接口实现 Tokio 的 `AsyncRead` 和 `AsyncWrite`。 diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 577a00ff..5b1ec5fa 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -17,6 +17,7 @@ monoio = { path = "../monoio", default-features = false, features = [ "legacy", "macros", "utils", + "poll-io", # experimental ] } # Enable tracing and tracing-subscriber for print out runtime debug @@ -25,15 +26,16 @@ monoio = { path = "../monoio", default-features = false, features = [ # tracing-subscriber = "0.3" # For hyper examples -hyper = { version = "0.14", features = ["http1", "client", "server", "stream"] } +hyper = { version = "1.1", features = ["http1", "client", "server"] } +http-body-util = "0.1" # For h2 examples bytes = { version = "1" } -h2 = { version = "0.3" } -http = { version = "0.2" } +h2 = { version = "0.4" } +http = { version = "1" } # For hyper and h2 examples -monoio-compat = { path = "../monoio-compat" } +monoio-compat = { path = "../monoio-compat", features = ["hyper"] } tokio = { version = "1", default-features = false, features = ["io-util"] } tower-service = "0.3" @@ -70,6 +72,10 @@ path = "echo.rs" name = "echo-tfo" path = "echo_tfo.rs" +[[example]] +name = "echo-poll" +path = "echo_poll.rs" + [[example]] name = "join" path = "join.rs" diff --git a/examples/echo_poll.rs b/examples/echo_poll.rs new file mode 100644 index 00000000..cd1be5c1 --- /dev/null +++ b/examples/echo_poll.rs @@ -0,0 +1,48 @@ +//! An echo example. +//! +//! Run the example and `nc 127.0.0.1 50002` in another shell. +//! All your input will be echoed out. + +use monoio::{ + io::{ + poll_io::{AsyncReadExt, AsyncWriteExt}, + IntoPollIo, + }, + net::{TcpListener, TcpStream}, +}; + +#[monoio::main(driver = "fusion")] +async fn main() { + let listener = TcpListener::bind("127.0.0.1:50002").unwrap(); + println!("listening"); + loop { + let incoming = listener.accept().await; + match incoming { + Ok((stream, addr)) => { + println!("accepted a connection from {addr}"); + monoio::spawn(echo(stream)); + } + Err(e) => { + println!("accepted connection failed: {e}"); + return; + } + } + } +} + +async fn echo(stream: TcpStream) -> std::io::Result<()> { + // Convert completion-based io to poll-based io(which impl tokio::io) + let mut stream = stream.into_poll_io()?; + let mut buf: Vec = vec![0; 1024]; + let mut res; + loop { + // read + res = stream.read(&mut buf).await?; + if res == 0 { + return Ok(()); + } + + // write all + stream.write_all(&buf[0..res]).await?; + } +} diff --git a/examples/hyper_client.rs b/examples/hyper_client.rs index ce26194a..4bb86539 100644 --- a/examples/hyper_client.rs +++ b/examples/hyper_client.rs @@ -1,127 +1,60 @@ -//! HTTP client example with hyper in compatible mode. +//! HTTP client example with hyper in poll-io mode. //! -//! It will try to fetch http://127.0.0.1:23300/monoio and print the +//! It will try to fetch http://httpbin.org/ip and print the //! response. -//! -//! Note: -//! It is not recommended to use this example as a production code. -//! The `hyper` require `Send` for a future and obviously the future -//! is not `Send` in monoio. So we just use some unsafe code to let -//! it pass which infact not a good solution but the only way to -//! make it work without modifying hyper. -use std::{future::Future, pin::Pin}; +use std::io::Write; -use monoio_compat::TcpStreamCompat; +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::Request; +use monoio::{io::IntoPollIo, net::TcpStream}; +use monoio_compat::hyper::MonoioIo; -#[derive(Clone)] -struct HyperExecutor; +type Result = std::result::Result>; -impl hyper::rt::Executor for HyperExecutor -where - F: Future + 'static, - F::Output: 'static, -{ - fn execute(&self, fut: F) { - monoio::spawn(fut); - } -} +async fn fetch_url(url: hyper::Uri) -> Result<()> { + let host = url.host().expect("uri has no host"); + let port = url.port_u16().unwrap_or(80); + let addr = format!("{}:{}", host, port); + let stream = TcpStream::connect(addr).await?.into_poll_io()?; + let io = MonoioIo::new(stream); -#[derive(Clone)] -struct HyperConnector; + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + monoio::spawn(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); -impl tower_service::Service for HyperConnector { - type Response = HyperConnection; + let authority = url.authority().unwrap().clone(); - type Error = std::io::Error; + let path = url.path(); + let req = Request::builder() + .uri(path) + .header(hyper::header::HOST, authority.as_str()) + .body(Empty::::new())?; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn poll_ready( - &mut self, - _: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } + let mut res = sender.send_request(req).await?; - fn call(&mut self, uri: hyper::Uri) -> Self::Future { - let host = uri.host().unwrap(); - let port = uri.port_u16().unwrap_or(80); - let address = format!("{host}:{port}"); + println!("Response: {}", res.status()); + println!("Headers: {:#?}\n", res.headers()); - #[allow(clippy::type_complexity)] - let b: Pin>>> = - Box::pin(async move { - let conn = monoio::net::TcpStream::connect(address).await?; - let hyper_conn = HyperConnection(TcpStreamCompat::new(conn)); - Ok(hyper_conn) - }); - unsafe { std::mem::transmute(b) } + // Stream the body, writing each chunk to stdout as we get it + // (instead of buffering and printing at the end). + while let Some(next) = res.frame().await { + let frame = next?; + if let Some(chunk) = frame.data_ref() { + std::io::stdout().write_all(chunk)?; + } } -} - -struct HyperConnection(TcpStreamCompat); + println!("\n\nDone!"); -impl tokio::io::AsyncRead for HyperConnection { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } + Ok(()) } -impl tokio::io::AsyncWrite for HyperConnection { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } -} - -impl hyper::client::connect::Connection for HyperConnection { - fn connected(&self) -> hyper::client::connect::Connected { - hyper::client::connect::Connected::new() - } -} - -#[allow(clippy::non_send_fields_in_send_ty)] -unsafe impl Send for HyperConnection {} - #[monoio::main] async fn main() { - println!("Running http client"); - let connector = HyperConnector; - let client = hyper::Client::builder() - .executor(HyperExecutor) - .build::(connector); - let res = client - .get("http://127.0.0.1:23300/monoio".parse().unwrap()) - .await - .expect("failed to fetch"); - println!("Response status: {}", res.status()); - let body = hyper::body::to_bytes(res.into_body()) - .await - .expect("failed to read body"); - let body = - String::from_utf8(body.into_iter().collect()).expect("failed to convert body to string"); - println!("Response body: {body}"); + let url = "http://httpbin.org/ip".parse::().unwrap(); + fetch_url(url).await.unwrap(); } diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index d755fac0..8c494e6e 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -1,15 +1,14 @@ -//! HTTP server example with hyper in compatible mode. +//! HTTP server example with hyper in poll-io mode. //! //! After running this example, you can open http://localhost:23300 //! and http://localhost:23300/monoio in your browser or curl it. -//! Also you can try `hyper_client.rs` example to request it. use std::net::SocketAddr; +use bytes::Bytes; use futures::Future; -use hyper::{server::conn::Http, service::service_fn}; -use monoio::net::TcpListener; -use monoio_compat::TcpStreamCompat; +use hyper::{server::conn::http1, service::service_fn}; +use monoio::{io::IntoPollIo, net::TcpListener}; #[derive(Clone)] struct HyperExecutor; @@ -24,38 +23,48 @@ where } } -pub(crate) async fn serve_http(addr: A, service: S) -> std::io::Result<()> +pub(crate) async fn serve_http(addr: A, service: S) -> std::io::Result<()> where - S: FnMut(Request) -> F + 'static + Copy, - F: Future, R>> + 'static, - R: std::error::Error + 'static + Send + Sync, + S: Copy + Fn(Request) -> F + 'static, + F: Future>, E>> + 'static, + E: std::error::Error + 'static + Send + Sync, A: Into, { let listener = TcpListener::bind(addr.into())?; loop { let (stream, _) = listener.accept().await?; - monoio::spawn( - Http::new() - .with_executor(HyperExecutor) - .serve_connection(TcpStreamCompat::new(stream), service_fn(service)), - ); + let stream_poll = monoio_compat::hyper::MonoioIo::new(stream.into_poll_io()?); + monoio::spawn(async move { + // Handle the connection from the client using HTTP1 and pass any + // HTTP requests received on that connection to the `hello` function + if let Err(err) = http1::Builder::new() + .timer(monoio_compat::hyper::MonoioTimer) + .serve_connection(stream_poll, service_fn(service)) + .await + { + println!("Error serving connection: {:?}", err); + } + }); } } -use hyper::{Body, Method, Request, Response, StatusCode}; +use http_body_util::Full; +use hyper::{Method, Request, Response, StatusCode}; -async fn hyper_handler(req: Request) -> Result, std::convert::Infallible> { +async fn hyper_handler( + req: Request, +) -> Result>, std::convert::Infallible> { match (req.method(), req.uri().path()) { - (&Method::GET, "/") => Ok(Response::new(Body::from("Hello World!"))), - (&Method::GET, "/monoio") => Ok(Response::new(Body::from("Hello Monoio!"))), + (&Method::GET, "/") => Ok(Response::new(Full::new(Bytes::from("Hello World!")))), + (&Method::GET, "/monoio") => Ok(Response::new(Full::new(Bytes::from("Hello Monoio!")))), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::from("404 not found")) + .body(Full::new(Bytes::from("404 not found"))) .unwrap()), } } -#[monoio::main(threads = 2)] +#[monoio::main(threads = 2, timer_enabled = true)] async fn main() { println!("Running http server on 0.0.0.0:23300"); let _ = serve_http(([0, 0, 0, 0], 23300), hyper_handler).await; diff --git a/examples/tokio-io-compat/Cargo.toml b/examples/tokio-io-compat/Cargo.toml deleted file mode 100644 index 293df9fd..00000000 --- a/examples/tokio-io-compat/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -edition = "2021" -license = "MIT/Apache-2.0" -name = "tokio-io-compat" -publish = false -version = "0.0.0" - -[dependencies] -# legacy and tokio-compat enabled, iouring disabled. -monoio = {path = "../../monoio", default-features = false, features = ["async-cancel", "sync", "bytes", "legacy", "macros", "utils", "tokio-compat"]} - -hyper = {version = "0.14", features = ["http1", "client", "server", "stream"]} -tokio = {version = "1", default-features = false, features = ["io-util"]} -tower-service = "0.3" diff --git a/examples/tokio-io-compat/src/main.rs b/examples/tokio-io-compat/src/main.rs deleted file mode 100644 index bffdbe9b..00000000 --- a/examples/tokio-io-compat/src/main.rs +++ /dev/null @@ -1,130 +0,0 @@ -//! HTTP client example with hyper in compatible mode(without cost). -//! -//! It will try to fetch https://www.bytedance.com/ and print the -//! response. -//! -//! It looks like the `hyper_client.rs` in example. The difference is -//! in this version the tokio AsyncRead and AsyncWrite is implemented -//! without additional cost because we only enable legacy feature. - -use std::{future::Future, pin::Pin}; - -#[derive(Clone)] -struct HyperExecutor; - -impl hyper::rt::Executor for HyperExecutor -where - F: Future + 'static, - F::Output: 'static, -{ - #[inline] - fn execute(&self, fut: F) { - monoio::spawn(fut); - } -} - -#[derive(Clone)] -struct HyperConnector; - -impl tower_service::Service for HyperConnector { - type Response = HyperConnection; - - type Error = std::io::Error; - - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - #[inline] - fn poll_ready( - &mut self, - _: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - - fn call(&mut self, uri: hyper::Uri) -> Self::Future { - let host = uri.host().unwrap(); - let port = uri.port_u16().unwrap_or(80); - let address = format!("{host}:{port}"); - - #[allow(clippy::type_complexity)] - let b: Pin>>> = - Box::pin(async move { - let conn = monoio::net::TcpStream::connect(address).await?; - let hyper_conn = HyperConnection(conn); - Ok(hyper_conn) - }); - // Use transmust to make future Send(infact it is not) - unsafe { std::mem::transmute(b) } - } -} - -struct HyperConnection(monoio::net::TcpStream); - -impl tokio::io::AsyncRead for HyperConnection { - #[inline] - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } -} - -impl tokio::io::AsyncWrite for HyperConnection { - #[inline] - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - #[inline] - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - #[inline] - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } -} - -impl hyper::client::connect::Connection for HyperConnection { - #[inline] - fn connected(&self) -> hyper::client::connect::Connected { - hyper::client::connect::Connected::new() - } -} - -#[allow(clippy::non_send_fields_in_send_ty)] -unsafe impl Send for HyperConnection {} - -#[monoio::main] -async fn main() { - println!("Running http client"); - let connector = HyperConnector; - let client = hyper::Client::builder() - .executor(HyperExecutor) - .build::(connector); - let res = client - .get("https://www.bytedance.com/".parse().unwrap()) - .await - .expect("failed to fetch"); - println!("Response status: {}", res.status()); - let body = hyper::body::to_bytes(res.into_body()) - .await - .expect("failed to read body"); - let body = - String::from_utf8(body.into_iter().collect()).expect("failed to convert body to string"); - println!("Response body: {body}"); -} diff --git a/monoio-compat/Cargo.toml b/monoio-compat/Cargo.toml index c116d854..dbb35ac0 100644 --- a/monoio-compat/Cargo.toml +++ b/monoio-compat/Cargo.toml @@ -4,16 +4,26 @@ categories = ["asynchronous", "network-programming"] description = "A compat wrapper for monoio." edition = "2021" keywords = ["runtime", "iouring", "async"] -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" name = "monoio-compat" readme = "README.md" repository = "https://github.com/bytedance/monoio" version = "0.2.1" [dependencies] -monoio = {version = "0.2.0", path = "../monoio", default-features = false} +monoio = { version = "0.2.0", path = "../monoio", default-features = false } reusable-box-future = "0.2" -tokio = {version = "1", default-features = false, features = ["io-util"]} +tokio = { version = "1", default-features = false, features = ["io-util"] } + +hyper = { version = "1.1", optional = true } +pin-project-lite = { version = "0.2", optional = true } [dev-dependencies] -monoio = {version = "0.2.0", path = "../monoio", features = ["async-cancel", "macros"]} +monoio = { version = "0.2.0", path = "../monoio", features = [ + "async-cancel", + "macros", +] } + +[features] +# this is an experimental feature +hyper = ["dep:hyper", "dep:pin-project-lite", "monoio/poll-io"] diff --git a/monoio-compat/src/hyper.rs b/monoio-compat/src/hyper.rs new file mode 100644 index 00000000..1501f9a8 --- /dev/null +++ b/monoio-compat/src/hyper.rs @@ -0,0 +1,241 @@ +use std::{ + future::Future, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use hyper::rt::{Executor, Sleep, Timer}; +use pin_project_lite::pin_project; + +pub struct MonoioExecutor; +impl Executor for MonoioExecutor +where + F: std::future::Future + 'static, + F::Output: 'static, +{ + #[inline] + fn execute(&self, fut: F) { + monoio::spawn(fut); + } +} + +#[derive(Clone, Debug)] +pub struct MonoioTimer; +impl Timer for MonoioTimer { + #[inline] + fn sleep(&self, duration: Duration) -> Pin> { + Box::pin(MonoioSleep { + inner: monoio::time::sleep(duration), + }) + } + + #[inline] + fn sleep_until(&self, deadline: Instant) -> Pin> { + Box::pin(MonoioSleep { + inner: monoio::time::sleep_until(deadline.into()), + }) + } + + #[inline] + fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { + sleep.reset(new_deadline) + } + } +} + +pin_project! { + #[derive(Debug)] + struct MonoioSleep { + #[pin] + inner: monoio::time::Sleep, + } +} + +impl Future for MonoioSleep { + type Output = (); + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } +} + +impl Sleep for MonoioSleep {} +unsafe impl Send for MonoioSleep {} +unsafe impl Sync for MonoioSleep {} + +impl MonoioSleep { + #[inline] + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.project().inner.as_mut().reset(deadline.into()); + } +} + +pin_project! { + #[derive(Debug)] + pub struct MonoioIo { + #[pin] + inner: T, + } +} + +impl MonoioIo { + pub const fn new(inner: T) -> Self { + Self { inner } + } + + #[inline] + pub fn inner(self) -> T { + self.inner + } +} +impl Deref for MonoioIo { + type Target = T; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.inner + } +} +impl DerefMut for MonoioIo { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} +impl hyper::rt::Read for MonoioIo +where + T: monoio::io::poll_io::AsyncRead, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = monoio::io::poll_io::ReadBuf::uninit(buf.as_mut()); + match monoio::io::poll_io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for MonoioIo +where + T: monoio::io::poll_io::AsyncWrite, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + monoio::io::poll_io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + monoio::io::poll_io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + monoio::io::poll_io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + monoio::io::poll_io::AsyncWrite::is_write_vectored(&self.inner) + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + monoio::io::poll_io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl tokio::io::AsyncRead for MonoioIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + // let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncWrite for MonoioIo +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} diff --git a/monoio-compat/src/lib.rs b/monoio-compat/src/lib.rs index 8034381d..b0ece575 100644 --- a/monoio-compat/src/lib.rs +++ b/monoio-compat/src/lib.rs @@ -8,6 +8,9 @@ mod buf; mod safe_wrapper; mod tcp_unsafe; +#[cfg(feature = "hyper")] +pub mod hyper; + pub use safe_wrapper::StreamWrapper; pub use tcp_unsafe::TcpStreamCompat as TcpStreamCompatUnsafe; pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; diff --git a/monoio-macros/Cargo.toml b/monoio-macros/Cargo.toml index 4009be39..f7edcc48 100644 --- a/monoio-macros/Cargo.toml +++ b/monoio-macros/Cargo.toml @@ -4,7 +4,7 @@ categories = ["asynchronous"] description = "Monoio proc macros." edition = "2021" keywords = ["runtime", "iouring", "async"] -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" name = "monoio-macros" readme = "README.md" repository = "https://github.com/bytedance/monoio" diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index cce8bb88..74bfc0df 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -4,11 +4,11 @@ categories = ["asynchronous", "network-programming"] description = "A thread per core runtime based on iouring." edition = "2021" keywords = ["runtime", "iouring", "async"] -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" name = "monoio" readme = "../README.md" repository = "https://github.com/bytedance/monoio" -version = "0.2.1" +version = "0.2.2" # common dependencies [dependencies] @@ -19,10 +19,10 @@ fxhash = "0.2" libc = "0.2" pin-project-lite = "0.2" socket2 = { version = "0.5", features = ["all"] } -once_cell = "1.19.0" +memchr = "2.7" bytes = { version = "1", optional = true } -flume = { version = "0.10", optional = true } +flume = { version = "0.11", optional = true } mio = { version = "0.8", features = [ "net", "os-poll", @@ -34,7 +34,8 @@ tracing = { version = "0.1", default-features = false, features = [ "std", ], optional = true } ctrlc = { version = "3", optional = true } -memchr = "2.6" +lazy_static = { version = "1", optional = true } +once_cell = { version = "1.19.0", optional = true } # windows dependencies(will be added when windows support finished) [target.'cfg(windows)'.dependencies.windows-sys] @@ -66,7 +67,7 @@ splice = [] # enable `async main` macros support macros = ["monoio-macros"] # allow waker to be sent across threads -sync = ["flume", "threadpool"] +sync = ["flume", "threadpool", "once_cell"] # enable bind cpu set utils = ["nix"] # enable debug if you want to know what runtime does @@ -77,6 +78,8 @@ legacy = ["mio"] iouring = [] # tokio-compatible(only have effect when legacy is enabled and iouring is not) tokio-compat = ["tokio"] +# (experimental)enable poll-io to convert structs to structs that impl tokio's poll io +poll-io = ["tokio", "mio"] # signal enables setting ctrl_c handler signal = ["ctrlc", "sync"] signal-termination = ["signal", "ctrlc/termination"] diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index 7d3469d7..d28d0dbf 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -137,6 +137,7 @@ impl DefaultThreadPool { } impl ThreadPool for DefaultThreadPool { + #[inline] fn schedule_task(&self, task: BlockingTask) { self.pool.execute(move || task.run()); } diff --git a/monoio/src/buf/raw_buf.rs b/monoio/src/buf/raw_buf.rs index 912bc409..6828de67 100644 --- a/monoio/src/buf/raw_buf.rs +++ b/monoio/src/buf/raw_buf.rs @@ -31,7 +31,7 @@ impl RawBuf { /// # Safety /// make sure the pointer and length is valid when RawBuf is used. #[inline] - pub unsafe fn new(ptr: *const u8, len: usize) -> Self { + pub const unsafe fn new(ptr: *const u8, len: usize) -> Self { Self { ptr, len } } } @@ -126,12 +126,12 @@ impl RawBufVectored { /// make sure the pointer and length is valid when RawBuf is used. #[cfg(unix)] #[inline] - pub unsafe fn new(ptr: *const libc::iovec, len: usize) -> Self { + pub const unsafe fn new(ptr: *const libc::iovec, len: usize) -> Self { Self { ptr, len } } #[cfg(windows)] #[inline] - pub unsafe fn new(ptr: *const WSABUF, len: usize) -> Self { + pub const unsafe fn new(ptr: *const WSABUF, len: usize) -> Self { Self { ptr, len } } } diff --git a/monoio/src/buf/slice.rs b/monoio/src/buf/slice.rs index 6ff70f6d..fac2e864 100644 --- a/monoio/src/buf/slice.rs +++ b/monoio/src/buf/slice.rs @@ -32,6 +32,7 @@ pub struct SliceMut { impl SliceMut { /// Create a SliceMut from a buffer and range. + #[inline] pub fn new(mut buf: T, begin: usize, end: usize) -> Self { assert!(end <= buf.bytes_total()); assert!(begin <= buf.bytes_init()); @@ -46,7 +47,7 @@ impl SliceMut { /// # Safety /// begin must be initialized, and end must be within the buffer capacity. #[inline] - pub unsafe fn new_unchecked(buf: T, begin: usize, end: usize) -> Self { + pub const unsafe fn new_unchecked(buf: T, begin: usize, end: usize) -> Self { Self { buf, begin, end } } @@ -63,7 +64,7 @@ impl SliceMut { /// assert_eq!(1, slice.begin()); /// ``` #[inline] - pub fn begin(&self) -> usize { + pub const fn begin(&self) -> usize { self.begin } @@ -80,7 +81,7 @@ impl SliceMut { /// assert_eq!(5, slice.end()); /// ``` #[inline] - pub fn end(&self) -> usize { + pub const fn end(&self) -> usize { self.end } @@ -100,7 +101,7 @@ impl SliceMut { /// assert_eq!(&slice[..], b"hello"); /// ``` #[inline] - pub fn get_ref(&self) -> &T { + pub const fn get_ref(&self) -> &T { &self.buf } @@ -139,6 +140,7 @@ impl SliceMut { /// let buf = slice.into_inner(); /// assert_eq!(buf, b"hello world"); /// ``` + #[inline] pub fn into_inner(self) -> T { self.buf } @@ -208,25 +210,25 @@ impl Slice { /// # Safety /// begin and end must be within the buffer initialized range. #[inline] - pub unsafe fn new_unchecked(buf: T, begin: usize, end: usize) -> Self { + pub const unsafe fn new_unchecked(buf: T, begin: usize, end: usize) -> Self { Self { buf, begin, end } } /// Offset in the underlying buffer at which this slice starts. #[inline] - pub fn begin(&self) -> usize { + pub const fn begin(&self) -> usize { self.begin } /// Ofset in the underlying buffer at which this slice ends. #[inline] - pub fn end(&self) -> usize { + pub const fn end(&self) -> usize { self.end } /// Gets a reference to the underlying buffer. #[inline] - pub fn get_ref(&self) -> &T { + pub const fn get_ref(&self) -> &T { &self.buf } @@ -263,6 +265,7 @@ pub struct IoVecWrapper { impl IoVecWrapper { /// Create a new IoVecWrapper with something that impl IoVecBuf. + #[inline] pub fn new(buf: T) -> Result { #[cfg(unix)] if buf.read_iovec_len() == 0 { @@ -276,6 +279,7 @@ impl IoVecWrapper { } /// Consume self and return raw iovec buf. + #[inline] pub fn into_inner(self) -> T { self.raw } @@ -319,6 +323,7 @@ pub struct IoVecWrapperMut { impl IoVecWrapperMut { /// Create a new IoVecWrapperMut with something that impl IoVecBufMut. + #[inline] pub fn new(mut iovec_buf: T) -> Result { if iovec_buf.write_iovec_len() == 0 { return Err(iovec_buf); @@ -327,6 +332,7 @@ impl IoVecWrapperMut { } /// Consume self and return raw iovec buf. + #[inline] pub fn into_inner(self) -> T { self.raw } diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index a92a7024..8a351a56 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -8,17 +8,16 @@ use std::{ time::Duration, }; -use self::{ready::Ready, scheduled_io::ScheduledIo}; use super::{ op::{CompletionMeta, Op, OpAble}, + ready::{self, Ready}, + scheduled_io::ScheduledIo, Driver, Inner, CURRENT, }; use crate::utils::slab::Slab; #[cfg(windows)] pub(super) mod iocp; -pub(crate) mod ready; -mod scheduled_io; #[cfg(feature = "sync")] mod waker; @@ -28,11 +27,11 @@ pub(crate) use waker::UnparkHandle; pub(crate) struct LegacyInner { pub(crate) io_dispatch: Slab, #[cfg(unix)] - events: Option, + events: mio::Events, #[cfg(unix)] poll: mio::Poll, #[cfg(windows)] - events: Option, + events: iocp::Events, #[cfg(windows)] poll: iocp::Poller, @@ -87,11 +86,11 @@ impl LegacyDriver { let inner = LegacyInner { io_dispatch: Slab::new(), #[cfg(unix)] - events: Some(mio::Events::with_capacity(entries as usize)), + events: mio::Events::with_capacity(entries as usize), #[cfg(unix)] poll, #[cfg(windows)] - events: Some(iocp::Events::with_capacity(entries as usize)), + events: iocp::Events::with_capacity(entries as usize), #[cfg(windows)] poll, #[cfg(feature = "sync")] @@ -149,7 +148,7 @@ impl LegacyDriver { } // here we borrow 2 mut self, but its safe. - let events = unsafe { (*self.inner.get()).events.as_mut().unwrap_unchecked() }; + let events = unsafe { &mut (*self.inner.get()).events }; match inner.poll.poll(events, timeout) { Ok(_) => {} Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} @@ -213,8 +212,7 @@ impl LegacyDriver { interest: mio::Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; - let io = ScheduledIo::default(); - let token = inner.io_dispatch.insert(io); + let token = inner.io_dispatch.insert(ScheduledIo::new()); let registry = inner.poll.registry(); match registry.register(source, mio::Token(token), interest) { @@ -279,37 +277,33 @@ impl LegacyInner { // wait io ready and do syscall let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost"); let ref_mut = scheduled_io.as_mut(); - loop { - let readiness = ready!(ref_mut.poll_readiness(cx, direction)); - // check if canceled - if readiness.is_canceled() { - // clear CANCELED part only - ref_mut.clear_readiness(readiness & Ready::CANCELED); - return Poll::Ready(CompletionMeta { - result: Err(io::Error::from_raw_os_error(125)), - flags: 0, - }); - } + let readiness = ready!(ref_mut.poll_readiness(cx, direction)); + + // check if canceled + if readiness.is_canceled() { + // clear CANCELED part only + ref_mut.clear_readiness(readiness & Ready::CANCELED); + return Poll::Ready(CompletionMeta { + result: Err(io::Error::from_raw_os_error(125)), + flags: 0, + }); + } - match OpAble::legacy_call(data) { - Ok(n) => { - return Poll::Ready(CompletionMeta { - result: Ok(n), - flags: 0, - }) - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - ref_mut.clear_readiness(direction.mask()); - continue; - } - Err(e) => { - return Poll::Ready(CompletionMeta { - result: Err(e), - flags: 0, - }) - } + match OpAble::legacy_call(data) { + Ok(n) => Poll::Ready(CompletionMeta { + result: Ok(n), + flags: 0, + }), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + ref_mut.clear_readiness(direction.mask()); + ref_mut.set_waker(cx, direction); + Poll::Pending } + Err(e) => Poll::Ready(CompletionMeta { + result: Err(e), + flags: 0, + }), } } diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index cf443303..6aeebe99 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -1,6 +1,11 @@ /// Monoio Driver. -// #[cfg(unix)] pub(crate) mod op; +#[cfg(feature = "poll-io")] +pub(crate) mod poll; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +pub(crate) mod ready; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +pub(crate) mod scheduled_io; pub(crate) mod shared_fd; #[cfg(feature = "sync")] pub(crate) mod thread; @@ -20,14 +25,6 @@ use std::{ #[cfg(feature = "legacy")] pub use self::legacy::LegacyDriver; -// #[cfg(windows)] -// pub mod op { -// pub struct CompletionMeta {} -// pub struct Op { -// pub data: T, -// } -// pub trait OpAble {} -// } #[cfg(feature = "legacy")] use self::legacy::LegacyInner; use self::op::{CompletionMeta, Op, OpAble}; @@ -88,6 +85,7 @@ pub trait Driver { scoped_thread_local!(pub(crate) static CURRENT: Inner); +#[derive(Clone)] pub(crate) enum Inner { #[cfg(all(target_os = "linux", feature = "iouring"))] Uring(std::rc::Rc>), @@ -138,6 +136,29 @@ impl Inner { } } + #[cfg(feature = "poll-io")] + fn poll_legacy_op( + &self, + data: &mut T, + cx: &mut Context<'_>, + ) -> Poll { + match self { + #[cfg(windows)] + _ => unimplemented!(), + #[cfg(all(target_os = "linux", feature = "iouring"))] + Inner::Uring(this) => UringInner::poll_legacy_op(this, data, cx), + #[cfg(feature = "legacy")] + Inner::Legacy(this) => LegacyInner::poll_op::(this, data, cx), + #[cfg(all( + not(feature = "legacy"), + not(all(target_os = "linux", feature = "iouring")) + ))] + _ => { + util::feature_panic(); + } + } + } + #[allow(unused)] fn drop_op(&self, index: usize, data: &mut Option) { match self { diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index fa8a8c31..b3902320 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -53,24 +53,29 @@ pub(crate) trait OpAble { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry; - #[cfg(feature = "legacy")] - fn legacy_interest(&self) -> Option<(super::legacy::ready::Direction, usize)>; - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + fn legacy_interest(&self) -> Option<(super::ready::Direction, usize)>; + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result; } /// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way. /// This can provide better compatibility for crates programmed in poll-like way. -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] pub(crate) trait PollLegacy { + #[cfg(feature = "legacy")] fn poll_legacy(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll; + #[cfg(feature = "poll-io")] + fn poll_io(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll; } -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] impl PollLegacy for T where T: OpAble, { + #[cfg(feature = "legacy")] + #[inline] fn poll_legacy(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll { #[cfg(all(feature = "iouring", feature = "tokio-compat"))] unsafe { @@ -85,6 +90,12 @@ where #[cfg(not(all(feature = "iouring", feature = "tokio-compat")))] driver::CURRENT.with(|this| this.poll_op(self, 0, _cx)) } + + #[cfg(feature = "poll-io")] + #[inline] + fn poll_io(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll { + driver::CURRENT.with(|this| this.poll_legacy_op(self, cx)) + } } impl Op { @@ -164,12 +175,14 @@ impl Drop for Op { /// Check if current driver is legacy. #[allow(unused)] #[cfg(not(target_os = "linux"))] -pub fn is_legacy() -> bool { +#[inline] +pub const fn is_legacy() -> bool { true } /// Check if current driver is legacy. #[cfg(target_os = "linux")] +#[inline] pub fn is_legacy() -> bool { super::CURRENT.with(|inner| inner.is_legacy()) } @@ -178,7 +191,7 @@ pub fn is_legacy() -> bool { pub(crate) struct OpCanceller { pub(super) index: usize, #[cfg(feature = "legacy")] - pub(super) direction: Option, + pub(super) direction: Option, } impl OpCanceller { diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs index 4e708e49..596a8399 100644 --- a/monoio/src/driver/op/accept.rs +++ b/monoio/src/driver/op/accept.rs @@ -13,12 +13,12 @@ use { accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE, }, }; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; -#[cfg(feature = "legacy")] -use crate::driver::legacy::ready::Direction; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; /// Accept pub(crate) struct Accept { @@ -62,12 +62,13 @@ impl OpAble for Accept { .build() } - #[cfg(feature = "legacy")] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(windows)] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket(); let addr = self.addr.0.as_mut_ptr() as *mut _; @@ -76,7 +77,7 @@ impl OpAble for Accept { syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let addr = self.addr.0.as_mut_ptr() as *mut _; diff --git a/monoio/src/driver/op/close.rs b/monoio/src/driver/op/close.rs index 9048fd15..19557c5f 100644 --- a/monoio/src/driver/op/close.rs +++ b/monoio/src/driver/op/close.rs @@ -11,10 +11,6 @@ use { }; use super::{Op, OpAble}; -#[cfg(feature = "legacy")] -use crate::driver::legacy::ready::Direction; -#[cfg(all(unix, feature = "legacy"))] -use crate::syscall_u32; pub(crate) struct Close { #[cfg(unix)] @@ -42,15 +38,16 @@ impl OpAble for Close { opcode::Close::new(types::Fd(self.fd)).build() } - #[cfg(feature = "legacy")] - fn legacy_interest(&self) -> Option<(Direction, usize)> { + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] + fn legacy_interest(&self) -> Option<(crate::driver::ready::Direction, usize)> { None } - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { #[cfg(unix)] - return syscall_u32!(close(self.fd)); + return crate::syscall_u32!(close(self.fd)); #[cfg(windows)] return syscall!(closesocket(self.fd), PartialEq::ne, 0); diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs index c6a8b25d..6fcd60c1 100644 --- a/monoio/src/driver/op/connect.rs +++ b/monoio/src/driver/op/connect.rs @@ -9,8 +9,8 @@ use windows_sys::Win32::Networking::WinSock::{ }; use super::{super::shared_fd::SharedFd, Op, OpAble}; -#[cfg(feature = "legacy")] -use crate::driver::legacy::ready::Direction; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; pub(crate) struct Connect { pub(crate) fd: SharedFd, @@ -52,12 +52,13 @@ impl OpAble for Connect { .build() } - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { // For ios/macos, if tfo is enabled, we will // call connectx here. @@ -145,12 +146,13 @@ impl OpAble for ConnectUnix { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { match crate::syscall_u32!(connect( self.fd.raw_fd(), diff --git a/monoio/src/driver/op/fsync.rs b/monoio/src/driver/op/fsync.rs index f23c1de5..db363b3d 100644 --- a/monoio/src/driver/op/fsync.rs +++ b/monoio/src/driver/op/fsync.rs @@ -6,11 +6,11 @@ use io_uring::{opcode, types}; use windows_sys::Win32::Storage::FileSystem::FlushFileBuffers; use super::{super::shared_fd::SharedFd, Op, OpAble}; -#[cfg(feature = "legacy")] -use crate::driver::legacy::ready::Direction; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; #[cfg(windows)] use crate::syscall; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::syscall_u32; pub(crate) struct Fsync { @@ -48,12 +48,13 @@ impl OpAble for Fsync { opc.build() } - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } - #[cfg(all(windows, feature = "legacy"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { syscall!( FlushFileBuffers(self.handle.as_raw_handle()), @@ -62,17 +63,15 @@ impl OpAble for Fsync { ) } - #[cfg(all(unix, not(target_os = "linux"), feature = "legacy"))] - fn legacy_call(&mut self) -> io::Result { - syscall_u32!(fsync(self.fd.raw_fd())) - } - - #[cfg(all(target_os = "linux", feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { + #[cfg(target_os = "linux")] if self.data_sync { syscall_u32!(fdatasync(self.fd.raw_fd())) } else { syscall_u32!(fsync(self.fd.raw_fd())) } + #[cfg(not(target_os = "linux"))] + syscall_u32!(fsync(self.fd.raw_fd())) } } diff --git a/monoio/src/driver/op/open.rs b/monoio/src/driver/op/open.rs index a9b158b7..ad2badc9 100644 --- a/monoio/src/driver/op/open.rs +++ b/monoio/src/driver/op/open.rs @@ -6,11 +6,11 @@ use io_uring::{opcode, types}; use windows_sys::Win32::{Foundation::INVALID_HANDLE_VALUE, Storage::FileSystem::CreateFileW}; use super::{Op, OpAble}; -#[cfg(feature = "legacy")] -use crate::driver::legacy::ready::Direction; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; #[cfg(windows)] use crate::syscall; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::syscall_u32; use crate::{driver::util::cstr, fs::OpenOptions}; @@ -62,12 +62,13 @@ impl OpAble for Open { .build() } - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { None } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] fn legacy_call(&mut self) -> io::Result { syscall_u32!(open( self.path.as_c_str().as_ptr(), @@ -76,7 +77,7 @@ impl OpAble for Open { )) } - #[cfg(windows)] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { syscall!( CreateFileW( diff --git a/monoio/src/driver/op/poll.rs b/monoio/src/driver/op/poll.rs index 4069bb02..1d8bc2c4 100644 --- a/monoio/src/driver/op/poll.rs +++ b/monoio/src/driver/op/poll.rs @@ -13,8 +13,8 @@ use windows_sys::Win32::Networking::WinSock::{ }; use super::{super::shared_fd::SharedFd, Op, OpAble}; -#[cfg(feature = "legacy")] -use crate::driver::legacy::ready::Direction; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; pub(crate) struct PollAdd { /// Holds a strong ref to the FD, preventing the file from being closed @@ -23,7 +23,7 @@ pub(crate) struct PollAdd { fd: SharedFd, // true: read; false: write is_read: bool, - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] relaxed: bool, } @@ -32,7 +32,7 @@ impl Op { Op::submit_with(PollAdd { fd: fd.clone(), is_read: true, - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] relaxed: _relaxed, }) } @@ -41,7 +41,7 @@ impl Op { Op::submit_with(PollAdd { fd: fd.clone(), is_read: false, - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] relaxed: _relaxed, }) } @@ -66,7 +66,8 @@ impl OpAble for PollAdd { .build() } - #[cfg(feature = "legacy")] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| { ( @@ -80,7 +81,7 @@ impl OpAble for PollAdd { }) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] fn legacy_call(&mut self) -> io::Result { if !self.relaxed { use std::{io::ErrorKind, os::fd::AsRawFd}; @@ -102,7 +103,7 @@ impl OpAble for PollAdd { Ok(0) } - #[cfg(windows)] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { if !self.relaxed { let mut pollfd = WSAPOLLFD { diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index 9ca340ff..bb6dc842 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -2,9 +2,9 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use { - crate::{driver::legacy::ready::Direction, syscall_u32}, + crate::{driver::ready::Direction, syscall_u32}, std::os::unix::prelude::AsRawFd, }; @@ -66,12 +66,13 @@ impl OpAble for Read { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let seek_offset = libc::off_t::try_from(self.offset) @@ -132,12 +133,13 @@ impl OpAble for ReadVec { opcode::Readv::new(types::Fd(self.fd.raw_fd()), ptr, len).build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { syscall_u32!(readv( self.fd.raw_fd(), diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index ce6ecc8f..b0609d65 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -6,9 +6,9 @@ use std::{ #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use { - crate::{driver::legacy::ready::Direction, syscall_u32}, + crate::{driver::ready::Direction, syscall_u32}, std::os::unix::prelude::AsRawFd, }; @@ -64,12 +64,13 @@ impl OpAble for Recv { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); syscall_u32!(recv( @@ -170,12 +171,13 @@ impl OpAble for RecvMsg { opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) @@ -248,12 +250,13 @@ impl OpAble for RecvMsgUnix { opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index 355cf6a0..e2d692ee 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -3,9 +3,9 @@ use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; use socket2::SockAddr; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use { - crate::{driver::legacy::ready::Direction, syscall_u32}, + crate::{driver::ready::Direction, syscall_u32}, std::os::unix::prelude::AsRawFd, }; @@ -76,14 +76,15 @@ impl OpAble for Send { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd .registered_index() .map(|idx| (Direction::Write, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); #[cfg(target_os = "linux")] @@ -162,14 +163,15 @@ impl OpAble for SendMsg { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd .registered_index() .map(|idx| (Direction::Write, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] #[allow(deprecated)] @@ -242,14 +244,16 @@ impl OpAble for SendMsgUnix { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd .registered_index() .map(|idx| (Direction::Write, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] #[allow(deprecated)] diff --git a/monoio/src/driver/op/splice.rs b/monoio/src/driver/op/splice.rs index e4f0da72..21940576 100644 --- a/monoio/src/driver/op/splice.rs +++ b/monoio/src/driver/op/splice.rs @@ -6,7 +6,7 @@ use std::io; use io_uring::{opcode, types}; #[cfg(all(unix, feature = "legacy"))] use { - crate::{driver::legacy::ready::Direction, syscall_u32}, + crate::{driver::ready::Direction, syscall_u32}, std::os::unix::prelude::AsRawFd, }; @@ -73,6 +73,7 @@ impl OpAble for Splice { } #[cfg(all(unix, feature = "legacy"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { match self.direction { SpliceDirection::FromPipe => self diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index bfe38a87..c2d93c98 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -2,9 +2,9 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] use { - crate::{driver::legacy::ready::Direction, syscall_u32}, + crate::{driver::ready::Direction, syscall_u32}, std::os::unix::prelude::AsRawFd, }; @@ -51,14 +51,15 @@ impl OpAble for Write { .build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd .registered_index() .map(|idx| (Direction::Write, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let seek_offset = libc::off_t::try_from(self.offset) @@ -120,14 +121,15 @@ impl OpAble for WriteVec { opcode::Writev::new(types::Fd(self.fd.raw_fd()), ptr, len).build() } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd .registered_index() .map(|idx| (Direction::Write, idx)) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { syscall_u32!(writev( self.fd.raw_fd(), diff --git a/monoio/src/driver/poll.rs b/monoio/src/driver/poll.rs new file mode 100644 index 00000000..960503c2 --- /dev/null +++ b/monoio/src/driver/poll.rs @@ -0,0 +1,108 @@ +use std::{io, os::fd::AsRawFd, task::Context, time::Duration}; + +use super::{ready::Direction, scheduled_io::ScheduledIo}; +use crate::{driver::op::CompletionMeta, utils::slab::Slab}; + +/// Poller with io dispatch. +// TODO: replace legacy impl with this Poll. +pub(crate) struct Poll { + pub(crate) io_dispatch: Slab, + poll: mio::Poll, + events: mio::Events, +} + +impl Poll { + #[inline] + pub(crate) fn with_capacity(capacity: usize) -> io::Result { + Ok(Self { + io_dispatch: Slab::new(), + poll: mio::Poll::new()?, + events: mio::Events::with_capacity(capacity), + }) + } + + #[inline] + pub(crate) fn tick(&mut self, timeout: Option) -> io::Result<()> { + match self.poll.poll(&mut self.events, timeout) { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + for event in self.events.iter() { + let token = event.token(); + + if let Some(mut sio) = self.io_dispatch.get(token.0) { + let ref_mut = sio.as_mut(); + let ready = super::ready::Ready::from_mio(event); + ref_mut.set_readiness(|curr| curr | ready); + ref_mut.wake(ready); + } + } + Ok(()) + } + + pub(crate) fn register( + &mut self, + source: &mut impl mio::event::Source, + interest: mio::Interest, + ) -> io::Result { + let token = self.io_dispatch.insert(ScheduledIo::new()); + let registry = self.poll.registry(); + match registry.register(source, mio::Token(token), interest) { + Ok(_) => Ok(token), + Err(e) => { + self.io_dispatch.remove(token); + Err(e) + } + } + } + + pub(crate) fn deregister( + &mut self, + source: &mut impl mio::event::Source, + token: usize, + ) -> io::Result<()> { + match self.poll.registry().deregister(source) { + Ok(_) => { + self.io_dispatch.remove(token); + Ok(()) + } + Err(e) => Err(e), + } + } + + #[inline] + pub(crate) fn poll_syscall( + &mut self, + cx: &mut Context<'_>, + token: usize, + direction: Direction, + syscall: impl FnOnce() -> io::Result, + ) -> std::task::Poll { + let mut scheduled_io = self.io_dispatch.get(token).expect("scheduled_io lost"); + let ref_mut = scheduled_io.as_mut(); + ready!(ref_mut.poll_readiness(cx, direction)); + match syscall() { + Ok(n) => std::task::Poll::Ready(CompletionMeta { + result: Ok(n), + flags: 0, + }), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + ref_mut.clear_readiness(direction.mask()); + ref_mut.set_waker(cx, direction); + std::task::Poll::Pending + } + Err(e) => std::task::Poll::Ready(CompletionMeta { + result: Err(e), + flags: 0, + }), + } + } +} + +impl AsRawFd for Poll { + #[inline] + fn as_raw_fd(&self) -> std::os::fd::RawFd { + self.poll.as_raw_fd() + } +} diff --git a/monoio/src/driver/legacy/ready.rs b/monoio/src/driver/ready.rs similarity index 98% rename from monoio/src/driver/legacy/ready.rs rename to monoio/src/driver/ready.rs index 374ddcd5..38d6a7a7 100644 --- a/monoio/src/driver/legacy/ready.rs +++ b/monoio/src/driver/ready.rs @@ -128,6 +128,7 @@ impl Ready { self.contains(Ready::WRITE_CLOSED) } + #[allow(dead_code)] pub(crate) fn is_canceled(self) -> bool { !(self & Ready::CANCELED).is_empty() } @@ -240,3 +241,5 @@ impl Direction { } } } + +pub(crate) const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); diff --git a/monoio/src/driver/legacy/scheduled_io.rs b/monoio/src/driver/scheduled_io.rs similarity index 86% rename from monoio/src/driver/legacy/scheduled_io.rs rename to monoio/src/driver/scheduled_io.rs index 7b3b713d..5dfdfbd5 100644 --- a/monoio/src/driver/legacy/scheduled_io.rs +++ b/monoio/src/driver/scheduled_io.rs @@ -12,25 +12,33 @@ pub(crate) struct ScheduledIo { } impl Default for ScheduledIo { + #[inline] fn default() -> Self { + Self::new() + } +} + +impl ScheduledIo { + pub(crate) const fn new() -> Self { Self { readiness: Ready::EMPTY, reader: None, writer: None, } } -} -impl ScheduledIo { #[allow(unused)] + #[inline] pub(crate) fn set_writable(&mut self) { self.readiness |= Ready::WRITABLE; } + #[inline] pub(crate) fn set_readiness(&mut self, f: impl Fn(Ready) -> Ready) { self.readiness = f(self.readiness); } + #[inline] pub(crate) fn wake(&mut self, ready: Ready) { if ready.is_readable() { if let Some(waker) = self.reader.take() { @@ -44,11 +52,13 @@ impl ScheduledIo { } } + #[inline] pub(crate) fn clear_readiness(&mut self, ready: Ready) { self.readiness = self.readiness - ready; } #[allow(clippy::needless_pass_by_ref_mut)] + #[inline] pub(crate) fn poll_readiness( &mut self, cx: &mut Context<'_>, @@ -58,6 +68,12 @@ impl ScheduledIo { if !ready.is_empty() { return Poll::Ready(ready); } + self.set_waker(cx, direction); + Poll::Pending + } + + #[inline] + pub(crate) fn set_waker(&mut self, cx: &mut Context<'_>, direction: Direction) { let slot = match direction { Direction::Read => &mut self.reader, Direction::Write => &mut self.writer, @@ -72,6 +88,5 @@ impl ScheduledIo { *slot = Some(cx.waker().clone()); } } - Poll::Pending } } diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index 5859b99a..ddf719ce 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -5,7 +5,7 @@ use std::os::windows::io::{AsRawSocket, FromRawSocket, OwnedSocket, RawSocket}; use std::{cell::UnsafeCell, io, rc::Rc}; #[cfg(windows)] -use super::legacy::iocp::SocketState; +use super::legacy::iocp::SocketState as RawFd; use super::CURRENT; // Tracks in-flight operations on a file descriptor. Ensures all in-flight @@ -17,12 +17,9 @@ pub(crate) struct SharedFd { struct Inner { // Open file descriptor - #[cfg(unix)] + #[cfg(any(unix, windows))] fd: RawFd, - #[cfg(windows)] - fd: SocketState, - // Waker to notify when the close operation completes. state: UnsafeCell, } @@ -34,6 +31,85 @@ enum State { Legacy(Option), } +#[cfg(feature = "poll-io")] +impl State { + #[cfg(all(target_os = "linux", feature = "iouring"))] + #[allow(unreachable_patterns)] + pub(crate) fn cvt_uring_poll(&mut self, fd: RawFd) -> io::Result<()> { + let state = match self { + State::Uring(state) => state, + _ => return Ok(()), + }; + // TODO: only Init state can convert? + if matches!(state, UringState::Init) { + let mut source = mio::unix::SourceFd(&fd); + crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK))?; + let reg = CURRENT + .with(|inner| match inner { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(r) => super::IoUringDriver::register_poll_io( + r, + &mut source, + super::ready::RW_INTERESTS, + ), + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(_) => panic!("unexpected legacy runtime"), + }) + .map_err(|e| { + let _ = crate::syscall!(fcntl(fd, libc::F_SETFL, 0)); + e + })?; + *state = UringState::Legacy(Some(reg)); + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "not clear uring state", + )); + } + Ok(()) + } + + #[cfg(not(all(target_os = "linux", feature = "iouring")))] + #[inline] + pub(crate) fn cvt_uring_poll(&mut self, _fd: RawFd) -> io::Result<()> { + Ok(()) + } + + #[cfg(all(target_os = "linux", feature = "iouring"))] + pub(crate) fn cvt_comp(&mut self, fd: RawFd) -> io::Result<()> { + let inner = match self { + Self::Uring(UringState::Legacy(inner)) => inner, + _ => return Ok(()), + }; + let Some(token) = inner else { + return Err(io::Error::new(io::ErrorKind::Other, "empty token")); + }; + let mut source = mio::unix::SourceFd(&fd); + crate::syscall!(fcntl(fd, libc::F_SETFL, 0))?; + CURRENT + .with(|inner| match inner { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(r) => { + super::IoUringDriver::deregister_poll_io(r, &mut source, *token) + } + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(_) => panic!("unexpected legacy runtime"), + }) + .map_err(|e| { + let _ = crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK)); + e + })?; + *self = State::Uring(UringState::Init); + Ok(()) + } + + #[cfg(not(all(target_os = "linux", feature = "iouring")))] + #[inline] + pub(crate) fn cvt_comp(&mut self, _fd: RawFd) -> io::Result<()> { + Ok(()) + } +} + impl std::fmt::Debug for Inner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Inner").field("fd", &self.fd).finish() @@ -53,6 +129,10 @@ enum UringState { /// The FD is fully closed Closed, + + /// Poller + #[cfg(feature = "poll-io")] + Legacy(Option), } #[cfg(unix)] @@ -72,24 +152,45 @@ impl AsRawSocket for SharedFd { impl SharedFd { #[cfg(unix)] #[allow(unreachable_code, unused)] - pub(crate) fn new(fd: RawFd) -> io::Result { - #[cfg(all(unix, feature = "legacy"))] - const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); + pub(crate) fn new(fd: RawFd) -> io::Result { + enum Reg { + Uring, + #[cfg(feature = "poll-io")] + UringLegacy(io::Result), + Legacy(io::Result), + } #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] let state = match CURRENT.with(|inner| match inner { - super::Inner::Uring(_) => None, + super::Inner::Uring(inner) => match FORCE_LEGACY { + false => Reg::Uring, + true => { + #[cfg(feature = "poll-io")] + { + let mut source = mio::unix::SourceFd(&fd); + Reg::UringLegacy(super::IoUringDriver::register_poll_io( + inner, + &mut source, + super::ready::RW_INTERESTS, + )) + } + #[cfg(not(feature = "poll-io"))] + Reg::Uring + } + }, super::Inner::Legacy(inner) => { let mut source = mio::unix::SourceFd(&fd); - Some(super::legacy::LegacyDriver::register( + Reg::Legacy(super::legacy::LegacyDriver::register( inner, &mut source, - RW_INTERESTS, + super::ready::RW_INTERESTS, )) } }) { - Some(reg) => State::Legacy(Some(reg?)), - None => State::Uring(UringState::Init), + Reg::Uring => State::Uring(UringState::Init), + #[cfg(feature = "poll-io")] + Reg::UringLegacy(idx) => State::Uring(UringState::Legacy(Some(idx?))), + Reg::Legacy(idx) => State::Legacy(Some(idx?)), }; #[cfg(all(not(feature = "legacy"), target_os = "linux", feature = "iouring"))] @@ -104,7 +205,11 @@ impl SharedFd { let reg = CURRENT.with(|inner| match inner { super::Inner::Legacy(inner) => { let mut source = mio::unix::SourceFd(&fd); - super::legacy::LegacyDriver::register(inner, &mut source, RW_INTERESTS) + super::legacy::LegacyDriver::register( + inner, + &mut source, + super::ready::RW_INTERESTS, + ) } }); @@ -131,7 +236,7 @@ impl SharedFd { pub(crate) fn new(fd: RawSocket) -> io::Result { const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); - let mut fd = SocketState::new(fd); + let mut fd = RawFd::new(fd); let state = { let reg = CURRENT.with(|inner| match inner { @@ -186,7 +291,7 @@ impl SharedFd { SharedFd { inner: Rc::new(Inner { - fd: SocketState::new(fd), + fd: RawFd::new(fd), state: UnsafeCell::new(state), }), } @@ -290,6 +395,8 @@ impl SharedFd { pub(crate) fn registered_index(&self) -> Option { let state = unsafe { &*self.inner.state.get() }; match state { + #[cfg(all(target_os = "linux", feature = "iouring", feature = "poll-io"))] + State::Uring(UringState::Legacy(s)) => *s, #[cfg(all(target_os = "linux", feature = "iouring"))] State::Uring(_) => None, #[cfg(feature = "legacy")] @@ -329,6 +436,20 @@ impl SharedFd { } } } + + #[cfg(feature = "poll-io")] + #[inline] + pub(crate) fn cvt_poll(&mut self) -> io::Result<()> { + let state = unsafe { &mut *self.inner.state.get() }; + state.cvt_uring_poll(self.inner.fd) + } + + #[cfg(feature = "poll-io")] + #[inline] + pub(crate) fn cvt_comp(&mut self) -> io::Result<()> { + let state = unsafe { &mut *self.inner.state.get() }; + state.cvt_comp(self.inner.fd) + } } #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -368,6 +489,8 @@ impl Inner { Poll::Ready(()) } UringState::Closed => Poll::Ready(()), + #[cfg(feature = "poll-io")] + UringState::Legacy(_) => Poll::Ready(()), }; } Poll::Ready(()) @@ -389,44 +512,67 @@ impl Drop for Inner { }; } #[cfg(feature = "legacy")] - State::Legacy(idx) => { - if CURRENT.is_set() { - CURRENT.with(|inner| { - match inner { - #[cfg(all(target_os = "linux", feature = "iouring"))] - super::Inner::Uring(_) => { - unreachable!("close legacy fd with uring runtime") - } - #[cfg(all(unix, feature = "legacy"))] - super::Inner::Legacy(inner) => { - // deregister it from driver(Poll and slab) and close fd - if let Some(idx) = idx { - let mut source = mio::unix::SourceFd(&fd); - let _ = super::legacy::LegacyDriver::deregister( - inner, - *idx, - &mut source, - ); - } - } - #[cfg(windows)] - super::Inner::Legacy(inner) => { - // deregister it from driver(Poll and slab) and close fd - if let Some(idx) = idx { - let _ = super::legacy::LegacyDriver::deregister( - inner, *idx, &mut fd, - ); - } - } - } - }) + State::Legacy(idx) => drop_legacy(fd, *idx), + #[cfg(all(target_os = "linux", feature = "iouring", feature = "poll-io"))] + State::Uring(UringState::Legacy(idx)) => drop_uring_legacy(fd, *idx), + _ => {} + } + } +} + +#[cfg(feature = "legacy")] +fn drop_legacy(fd: RawFd, idx: Option) { + if CURRENT.is_set() { + CURRENT.with(|inner| { + #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] + match inner { + #[cfg(all(target_os = "linux", feature = "iouring"))] + super::Inner::Uring(_) => { + unreachable!("close legacy fd with uring runtime") } + super::Inner::Legacy(inner) => { + // deregister it from driver(Poll and slab) and close fd + #[cfg(not(windows))] + if let Some(idx) = idx { + let mut source = mio::unix::SourceFd(&fd); + let _ = super::legacy::LegacyDriver::deregister(inner, idx, &mut source); + } + #[cfg(windows)] + if let Some(idx) = idx { + let _ = super::legacy::LegacyDriver::deregister(inner, idx, &mut fd); + } + } + } + }) + } + #[cfg(all(unix, feature = "legacy"))] + let _ = unsafe { std::fs::File::from_raw_fd(fd) }; + #[cfg(windows)] + let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; +} + +#[cfg(feature = "poll-io")] +fn drop_uring_legacy(fd: RawFd, idx: Option) { + if CURRENT.is_set() { + CURRENT.with(|inner| { + match inner { #[cfg(all(unix, feature = "legacy"))] - let _ = unsafe { std::fs::File::from_raw_fd(fd) }; - #[cfg(windows)] - let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; + super::Inner::Legacy(_) => { + unreachable!("close uring fd with legacy runtime") + } + #[cfg(all(target_os = "linux", feature = "iouring"))] + super::Inner::Uring(inner) => { + // deregister it from driver(Poll and slab) and close fd + if let Some(idx) = idx { + let mut source = mio::unix::SourceFd(&fd); + let _ = super::IoUringDriver::deregister_poll_io(inner, &mut source, idx); + } + } } - _ => {} - } + }) } + #[cfg(unix)] + let _ = unsafe { std::fs::File::from_raw_fd(fd) }; + #[cfg(windows)] + let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; } diff --git a/monoio/src/driver/thread.rs b/monoio/src/driver/thread.rs index abc4b643..e85b2506 100644 --- a/monoio/src/driver/thread.rs +++ b/monoio/src/driver/thread.rs @@ -1,16 +1,20 @@ +#[cfg(feature = "unstable")] +use std::sync::LazyLock; use std::{sync::Mutex, task::Waker}; use flume::Sender; use fxhash::FxHashMap; -use once_cell::sync::Lazy; +#[cfg(not(feature = "unstable"))] +use once_cell::sync::Lazy as LazyLock; use crate::driver::UnparkHandle; -static UNPARK: Lazy>> = - Lazy::new(|| Mutex::new(FxHashMap::default())); +static UNPARK: LazyLock>> = + LazyLock::new(|| Mutex::new(FxHashMap::default())); -static WAKER_SENDER: Lazy>>> = - Lazy::new(|| Mutex::new(FxHashMap::default())); +// Global waker sender map +static WAKER_SENDER: LazyLock>>> = + LazyLock::new(|| Mutex::new(FxHashMap::default())); macro_rules! lock { ($x: ident) => { diff --git a/monoio/src/driver/uring/mod.rs b/monoio/src/driver/uring/mod.rs index 25f61055..a2a06016 100644 --- a/monoio/src/driver/uring/mod.rs +++ b/monoio/src/driver/uring/mod.rs @@ -15,8 +15,12 @@ use lifecycle::Lifecycle; use super::{ op::{CompletionMeta, Op, OpAble}, + // ready::Ready, + // scheduled_io::ScheduledIo, util::timespec, - Driver, Inner, CURRENT, + Driver, + Inner, + CURRENT, }; use crate::utils::slab::Slab; @@ -31,8 +35,10 @@ pub(crate) const CANCEL_USERDATA: u64 = u64::MAX; pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1; #[allow(unused)] pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2; +#[cfg(feature = "poll-io")] +pub(crate) const POLLER_USERDATA: u64 = u64::MAX - 3; -pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 2; +pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 3; /// Driver with uring. pub struct IoUringDriver { @@ -54,6 +60,11 @@ pub(crate) struct UringInner { /// In-flight operations ops: Ops, + #[cfg(feature = "poll-io")] + poll: super::poll::Poll, + #[cfg(feature = "poll-io")] + poller_installed: bool, + /// IoUring bindings uring: ManuallyDrop, @@ -94,6 +105,10 @@ impl IoUringDriver { let uring = ManuallyDrop::new(urb.build(entries)?); let inner = Rc::new(UnsafeCell::new(UringInner { + #[cfg(feature = "poll-io")] + poll: super::poll::Poll::with_capacity(entries as usize)?, + #[cfg(feature = "poll-io")] + poller_installed: false, ops: Ops::new(), ext_arg: uring.params().is_feature_ext_arg(), uring, @@ -124,6 +139,10 @@ impl IoUringDriver { let (waker_sender, waker_receiver) = flume::unbounded::(); let inner = Rc::new(UnsafeCell::new(UringInner { + #[cfg(feature = "poll-io")] + poller_installed: false, + #[cfg(feature = "poll-io")] + poll: super::poll::Poll::with_capacity(entries as usize)?, ops: Ops::new(), ext_arg: uring.params().is_feature_ext_arg(), uring, @@ -174,6 +193,17 @@ impl IoUringDriver { inner.eventfd_installed = true; } + #[cfg(feature = "poll-io")] + fn install_poller(&self, inner: &mut UringInner, fd: RawFd) { + let entry = opcode::PollAdd::new(io_uring::types::Fd(fd), libc::POLLIN as _) + .build() + .user_data(POLLER_USERDATA); + + let mut sq = inner.uring.submission(); + let _ = unsafe { sq.push(&entry) }; + inner.poller_installed = true; + } + fn install_timeout(&self, inner: &mut UringInner, duration: Duration) { let timespec = timespec(duration); unsafe { @@ -225,6 +255,10 @@ impl IoUringDriver { if !inner.eventfd_installed { space += 1; } + #[cfg(feature = "poll-io")] + if !inner.poller_installed { + space += 1; + } if timeout.is_some() { space += 1; } @@ -232,11 +266,19 @@ impl IoUringDriver { Self::flush_space(inner, space)?; } - // 2. install eventfd and timeout + // 2.1 install poller + #[cfg(feature = "poll-io")] + if !inner.poller_installed { + self.install_poller(inner, inner.poll.as_raw_fd()); + } + + // 2.2 install eventfd and timeout #[cfg(feature = "sync")] if !inner.eventfd_installed { self.install_eventfd(inner, inner.shared_waker.as_raw_fd()); } + + // 2.3 install timeout and submit_and_wait with timeout if let Some(duration) = timeout { match inner.ext_arg { // Submit and Wait with timeout in an TimeoutOp way. @@ -274,10 +316,32 @@ impl IoUringDriver { .store(true, std::sync::atomic::Ordering::Release); // Process CQ - inner.tick(); + inner.tick()?; Ok(()) } + + #[cfg(feature = "poll-io")] + #[inline] + pub(crate) fn register_poll_io( + this: &Rc>, + source: &mut impl mio::event::Source, + interest: mio::Interest, + ) -> io::Result { + let inner = unsafe { &mut *this.get() }; + inner.poll.register(source, interest) + } + + #[cfg(feature = "poll-io")] + #[inline] + pub(crate) fn deregister_poll_io( + this: &Rc>, + source: &mut impl mio::event::Source, + token: usize, + ) -> io::Result<()> { + let inner = unsafe { &mut *this.get() }; + inner.poll.deregister(source, token) + } } impl Driver for IoUringDriver { @@ -291,7 +355,7 @@ impl Driver for IoUringDriver { fn submit(&self) -> io::Result<()> { let inner = unsafe { &mut *self.inner.get() }; inner.submit()?; - inner.tick(); + inner.tick()?; Ok(()) } @@ -313,7 +377,7 @@ impl Driver for IoUringDriver { } impl UringInner { - fn tick(&mut self) { + fn tick(&mut self) -> io::Result<()> { let cq = self.uring.completion(); for cqe in cq { @@ -321,10 +385,16 @@ impl UringInner { match index { #[cfg(feature = "sync")] EVENTFD_USERDATA => self.eventfd_installed = false, + #[cfg(feature = "poll-io")] + POLLER_USERDATA => { + self.poller_installed = false; + self.poll.tick(Some(Duration::ZERO))?; + } _ if index >= MIN_REVERSED_USERDATA => (), _ => self.ops.complete(index as _, resultify(&cqe), cqe.flags()), } } + Ok(()) } fn submit(&mut self) -> io::Result<()> { @@ -332,21 +402,19 @@ impl UringInner { match self.uring.submit() { #[cfg(feature = "unstable")] Err(ref e) - if e.kind() == io::ErrorKind::Other - || e.kind() == io::ErrorKind::ResourceBusy => + if matches!(e.kind(), io::ErrorKind::Other | io::ErrorKind::ResourceBusy) => { - self.tick(); + self.tick()?; } #[cfg(not(feature = "unstable"))] Err(ref e) - if e.raw_os_error() == Some(libc::EAGAIN) - || e.raw_os_error() == Some(libc::EBUSY) => + if matches!(e.raw_os_error(), Some(libc::EAGAIN) | Some(libc::EBUSY)) => { // This error is constructed with io::Error::last_os_error(): // https://github.com/tokio-rs/io-uring/blob/01c83bbce965d4aaf93ebfaa08c3aa8b7b0f5335/src/sys/mod.rs#L32 // So we can use https://doc.rust-lang.org/nightly/std/io/struct.Error.html#method.raw_os_error // to get the raw error code. - self.tick(); + self.tick()?; } e => return e.map(|_| ()), } @@ -412,6 +480,31 @@ impl UringInner { lifecycle.poll_op(cx) } + #[cfg(feature = "poll-io")] + pub(crate) fn poll_legacy_op( + this: &Rc>, + data: &mut T, + cx: &mut Context<'_>, + ) -> Poll { + let inner = unsafe { &mut *this.get() }; + let (direction, index) = match data.legacy_interest() { + Some(x) => x, + None => { + // if there is no index provided, it means the action does not rely on fd + // readiness. do syscall right now. + return Poll::Ready(CompletionMeta { + result: OpAble::legacy_call(data), + flags: 0, + }); + } + }; + + // wait io ready and do syscall + inner + .poll + .poll_syscall(cx, index, direction, || OpAble::legacy_call(data)) + } + pub(crate) fn drop_op( this: &Rc>, index: usize, diff --git a/monoio/src/fs/file.rs b/monoio/src/fs/file.rs index 3947a0d7..6df697d4 100644 --- a/monoio/src/fs/file.rs +++ b/monoio/src/fs/file.rs @@ -137,7 +137,7 @@ impl File { #[cfg(unix)] pub fn from_std(std: StdFile) -> io::Result { Ok(File { - fd: SharedFd::new(std.into_raw_fd())?, + fd: SharedFd::new::(std.into_raw_fd())?, }) } diff --git a/monoio/src/io/mod.rs b/monoio/src/io/mod.rs index fd9dcca3..7330c94b 100644 --- a/monoio/src/io/mod.rs +++ b/monoio/src/io/mod.rs @@ -26,6 +26,9 @@ pub use async_write_rent::{AsyncWriteRent, AsyncWriteRentAt}; pub use async_write_rent_ext::AsyncWriteRentExt; mod util; + +#[cfg(feature = "poll-io")] +pub use tokio::io as poll_io; pub(crate) use util::operation_canceled; #[cfg(all(target_os = "linux", feature = "splice"))] pub use util::zero_copy; @@ -33,3 +36,34 @@ pub use util::{ copy, BufReader, BufWriter, CancelHandle, Canceller, OwnedReadHalf, OwnedWriteHalf, PrefixedReadIo, Split, Splitable, }; +#[cfg(feature = "poll-io")] +/// Convert a completion-based io to a poll-based io. +pub trait IntoPollIo: Sized { + /// The poll-based io type. + type PollIo; + + /// Convert a completion-based io to a poll-based io(able to get comp_io back). + fn try_into_poll_io(self) -> Result; + + /// Convert a completion-based io to a poll-based io. + #[inline] + fn into_poll_io(self) -> std::io::Result { + self.try_into_poll_io().map_err(|(e, _)| e) + } +} + +#[cfg(feature = "poll-io")] +/// Convert a poll-based io to a completion-based io. +pub trait IntoCompIo: Sized { + /// The completion-based io type. + type CompIo; + + /// Convert a poll-based io to a completion-based io(able to get poll_io back). + fn try_into_comp_io(self) -> Result; + + /// Convert a poll-based io to a completion-based io. + #[inline] + fn into_comp_io(self) -> std::io::Result { + self.try_into_comp_io().map_err(|(e, _)| e) + } +} diff --git a/monoio/src/io/util/buf_reader.rs b/monoio/src/io/util/buf_reader.rs index f081749d..174ed41d 100644 --- a/monoio/src/io/util/buf_reader.rs +++ b/monoio/src/io/util/buf_reader.rs @@ -20,11 +20,13 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024; impl BufReader { /// Create BufReader with default buffer size + #[inline] pub fn new(inner: R) -> Self { Self::with_capacity(DEFAULT_BUF_SIZE, inner) } /// Create BufReader with given buffer size + #[inline] pub fn with_capacity(capacity: usize, inner: R) -> Self { let buffer = vec![0; capacity]; Self { @@ -38,11 +40,13 @@ impl BufReader { /// Gets a reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. - pub fn get_ref(&self) -> &R { + #[inline] + pub const fn get_ref(&self) -> &R { &self.inner } /// Gets a mutable reference to the underlying reader. + #[inline] pub fn get_mut(&mut self) -> &mut R { &mut self.inner } @@ -50,6 +54,7 @@ impl BufReader { /// Consumes this `BufReader`, returning the underlying reader. /// /// Note that any leftover data in the internal buffer is lost. + #[inline] pub fn into_inner(self) -> R { self.inner } @@ -58,6 +63,7 @@ impl BufReader { /// /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is /// empty. + #[inline] pub fn buffer(&self) -> &[u8] { &self.buf.as_ref().expect("unable to take buffer")[self.pos..self.cap] } diff --git a/monoio/src/io/util/buf_writer.rs b/monoio/src/io/util/buf_writer.rs index 16e91d25..92a52413 100644 --- a/monoio/src/io/util/buf_writer.rs +++ b/monoio/src/io/util/buf_writer.rs @@ -20,11 +20,13 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024; impl BufWriter { /// Create BufWriter with default buffer size + #[inline] pub fn new(inner: W) -> Self { Self::with_capacity(DEFAULT_BUF_SIZE, inner) } /// Create BufWriter with given buffer size + #[inline] pub fn with_capacity(capacity: usize, inner: W) -> Self { let buffer = vec![0; capacity]; Self { diff --git a/monoio/src/io/util/cancel.rs b/monoio/src/io/util/cancel.rs index fdc56e32..60bafc3f 100644 --- a/monoio/src/io/util/cancel.rs +++ b/monoio/src/io/util/cancel.rs @@ -29,6 +29,7 @@ struct Shared { impl Canceller { /// Create a new Canceller. + #[inline] pub fn new() -> Self { Default::default() } @@ -55,6 +56,7 @@ impl Canceller { } /// Create a CancelHandle which can be used to pass to io operation. + #[inline] pub fn handle(&self) -> CancelHandle { CancelHandle { shared: self.shared.clone(), diff --git a/monoio/src/io/util/prefixed_io.rs b/monoio/src/io/util/prefixed_io.rs index f5c2d29e..6c6d3730 100644 --- a/monoio/src/io/util/prefixed_io.rs +++ b/monoio/src/io/util/prefixed_io.rs @@ -40,7 +40,7 @@ pub struct PrefixedReadIo { impl PrefixedReadIo { /// Create a PrefixedIo with given io and read prefix. - pub fn new(io: I, prefix: P) -> Self { + pub const fn new(io: I, prefix: P) -> Self { Self { io, prefix, @@ -49,11 +49,12 @@ impl PrefixedReadIo { } /// If the prefix has read to eof - pub fn prefix_finished(&self) -> bool { + pub const fn prefix_finished(&self) -> bool { self.prefix_finished } /// Into inner + #[inline] pub fn into_inner(self) -> I { self.io } diff --git a/monoio/src/macros/scoped_tls.rs b/monoio/src/macros/scoped_tls.rs index fe065657..25980628 100644 --- a/monoio/src/macros/scoped_tls.rs +++ b/monoio/src/macros/scoped_tls.rs @@ -122,6 +122,7 @@ impl ScopedKey { } /// Test whether this TLS key has been `set` for the current thread. + #[inline] pub fn is_set(&'static self) -> bool { self.inner.with(|c| !c.get().is_null()) } diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 4695ab39..61ae8990 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -84,7 +84,7 @@ impl TcpListener { } #[cfg(unix)] - let fd = SharedFd::new(sys_listener.into_raw_fd())?; + let fd = SharedFd::new::(sys_listener.into_raw_fd())?; #[cfg(windows)] let fd = unimplemented!(); @@ -110,7 +110,7 @@ impl TcpListener { let fd = completion.meta.result?; // Construct stream - let stream = TcpStream::from_shared_fd(SharedFd::new(fd as _)?); + let stream = TcpStream::from_shared_fd(SharedFd::new::(fd as _)?); // Construct SocketAddr let storage = completion.data.addr.0.as_ptr(); @@ -163,7 +163,7 @@ impl TcpListener { let fd = completion.meta.result?; // Construct stream - let stream = TcpStream::from_shared_fd(SharedFd::new(fd as _)?); + let stream = TcpStream::from_shared_fd(SharedFd::new::(fd as _)?); // Construct SocketAddr let storage = completion.data.addr.0.as_ptr(); @@ -247,7 +247,7 @@ impl TcpListener { /// Creates new `TcpListener` from a `std::net::TcpListener`. pub fn from_std(stdl: std::net::TcpListener) -> io::Result { - match SharedFd::new(stdl.as_raw_fd()) { + match SharedFd::new::(stdl.as_raw_fd()) { Ok(shared) => { stdl.into_raw_fd(); Ok(Self::from_shared_fd(shared)) diff --git a/monoio/src/net/tcp/mod.rs b/monoio/src/net/tcp/mod.rs index c092c28a..ca5fd197 100644 --- a/monoio/src/net/tcp/mod.rs +++ b/monoio/src/net/tcp/mod.rs @@ -9,3 +9,6 @@ mod tfo; pub use listener::TcpListener; pub use split::{TcpOwnedReadHalf, TcpOwnedWriteHalf}; pub use stream::{TcpConnectOpts, TcpStream}; + +#[cfg(feature = "poll-io")] +pub mod stream_poll; diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index b43ae199..169d33a4 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -59,7 +59,7 @@ impl TcpConnectOpts { } /// TcpStream pub struct TcpStream { - fd: SharedFd, + pub(super) fd: SharedFd, meta: StreamMeta, } @@ -130,7 +130,7 @@ impl TcpStream { tfo = false; } } - let op = Op::connect(SharedFd::new(socket)?, addr, tfo)?; + let op = Op::connect(SharedFd::new::(socket)?, addr, tfo)?; let completion = op.await; completion.meta.result?; @@ -204,7 +204,7 @@ impl TcpStream { /// Creates new `TcpStream` from a `std::net::TcpStream`. pub fn from_std(stream: std::net::TcpStream) -> io::Result { - match SharedFd::new(stream.as_raw_fd()) { + match SharedFd::new::(stream.as_raw_fd()) { Ok(shared) => { stream.into_raw_fd(); Ok(Self::from_shared_fd(shared)) diff --git a/monoio/src/net/tcp/stream_poll.rs b/monoio/src/net/tcp/stream_poll.rs new file mode 100644 index 00000000..69435135 --- /dev/null +++ b/monoio/src/net/tcp/stream_poll.rs @@ -0,0 +1,176 @@ +//! This module provide a poll-io style interface for TcpStream. + +use std::{io, net::SocketAddr, os::fd::AsRawFd, time::Duration}; + +use super::TcpStream; +use crate::driver::op::Op; + +/// A TcpStream with poll-io style interface. +/// Using this struct, you can use TcpStream in a poll-like way. +/// Underlying, it is based on a uring-based epoll. +#[derive(Debug)] +pub struct TcpStreamPoll(TcpStream); + +impl crate::io::IntoPollIo for TcpStream { + type PollIo = TcpStreamPoll; + + #[inline] + fn try_into_poll_io(self) -> Result { + self.try_into_poll_io() + } +} + +impl TcpStream { + /// Convert to poll-io style TcpStreamPoll + #[inline] + pub fn try_into_poll_io(mut self) -> Result { + match self.fd.cvt_poll() { + Ok(_) => Ok(TcpStreamPoll(self)), + Err(e) => Err((e, self)), + } + } +} + +impl crate::io::IntoCompIo for TcpStreamPoll { + type CompIo = TcpStream; + + #[inline] + fn try_into_comp_io(self) -> Result { + self.try_into_comp_io() + } +} + +impl TcpStreamPoll { + /// Convert to normal TcpStream + #[inline] + pub fn try_into_comp_io(mut self) -> Result { + match self.0.fd.cvt_comp() { + Ok(_) => Ok(self.0), + Err(e) => Err((e, self)), + } + } +} + +impl tokio::io::AsyncRead for TcpStreamPoll { + #[inline] + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + unsafe { + let slice = buf.unfilled_mut(); + let raw_buf = crate::buf::RawBuf::new(slice.as_ptr() as *const u8, slice.len()); + let mut recv = Op::recv_raw(&self.0.fd, raw_buf); + let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut recv, cx)); + + std::task::Poll::Ready(ret.result.map(|n| { + buf.assume_init(n as usize); + buf.advance(n as usize); + })) + } + } +} + +impl tokio::io::AsyncWrite for TcpStreamPoll { + #[inline] + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + unsafe { + let raw_buf = crate::buf::RawBuf::new(buf.as_ptr(), buf.len()); + let mut send = Op::send_raw(&self.0.fd, raw_buf); + let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut send, cx)); + + std::task::Poll::Ready(ret.result.map(|n| n as usize)) + } + } + + #[inline] + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + #[inline] + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let fd = self.0.as_raw_fd(); + let res = match unsafe { libc::shutdown(fd, libc::SHUT_WR) } { + -1 => Err(io::Error::last_os_error()), + _ => Ok(()), + }; + std::task::Poll::Ready(res) + } + + #[inline] + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + unsafe { + let raw_buf = + crate::buf::RawBufVectored::new(bufs.as_ptr() as *const libc::iovec, bufs.len()); + let mut writev = Op::writev_raw(&self.0.fd, raw_buf); + let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut writev, cx)); + + std::task::Poll::Ready(ret.result.map(|n| n as usize)) + } + } + + #[inline] + fn is_write_vectored(&self) -> bool { + true + } +} + +impl TcpStreamPoll { + /// Return the local address that this stream is bound to. + #[inline] + pub fn local_addr(&self) -> io::Result { + self.0.local_addr() + } + + /// Return the remote address that this stream is connected to. + #[inline] + pub fn peer_addr(&self) -> io::Result { + self.0.peer_addr() + } + + /// Get the value of the `TCP_NODELAY` option on this socket. + #[inline] + pub fn nodelay(&self) -> io::Result { + self.0.nodelay() + } + + /// Set the value of the `TCP_NODELAY` option on this socket. + #[inline] + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.0.set_nodelay(nodelay) + } + + /// Set the value of the `SO_KEEPALIVE` option on this socket. + #[inline] + pub fn set_tcp_keepalive( + &self, + time: Option, + interval: Option, + retries: Option, + ) -> io::Result<()> { + self.0.set_tcp_keepalive(time, interval, retries) + } +} + +impl AsRawFd for TcpStreamPoll { + #[inline] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.0.as_raw_fd() + } +} diff --git a/monoio/src/net/udp.rs b/monoio/src/net/udp.rs index c99ed49f..abeb3764 100644 --- a/monoio/src/net/udp.rs +++ b/monoio/src/net/udp.rs @@ -64,7 +64,7 @@ impl UdpSocket { socket.bind(&addr)?; #[cfg(unix)] - let fd = SharedFd::new(socket.into_raw_fd())?; + let fd = SharedFd::new::(socket.into_raw_fd())?; #[cfg(windows)] let fd = unimplemented!(); @@ -134,7 +134,7 @@ impl UdpSocket { /// Creates new `UdpSocket` from a `std::net::UdpSocket`. pub fn from_std(socket: std::net::UdpSocket) -> io::Result { - match SharedFd::new(socket.as_raw_fd()) { + match SharedFd::new::(socket.as_raw_fd()) { Ok(shared) => { socket.into_raw_fd(); Ok(Self::from_shared_fd(shared)) diff --git a/monoio/src/net/unix/datagram/mod.rs b/monoio/src/net/unix/datagram/mod.rs index eb07981a..b31bf6ee 100644 --- a/monoio/src/net/unix/datagram/mod.rs +++ b/monoio/src/net/unix/datagram/mod.rs @@ -63,7 +63,7 @@ impl UnixDatagram { socklen: libc::socklen_t, ) -> io::Result { let socket = new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)?; - let op = Op::connect_unix(SharedFd::new(socket)?, sockaddr, socklen)?; + let op = Op::connect_unix(SharedFd::new::(socket)?, sockaddr, socklen)?; let completion = op.await; completion.meta.result?; @@ -72,7 +72,7 @@ impl UnixDatagram { /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`. pub fn from_std(datagram: StdUnixDatagram) -> io::Result { - match SharedFd::new(datagram.as_raw_fd()) { + match SharedFd::new::(datagram.as_raw_fd()) { Ok(shared) => { datagram.into_raw_fd(); Ok(Self::from_shared_fd(shared)) diff --git a/monoio/src/net/unix/listener.rs b/monoio/src/net/unix/listener.rs index 3f4b47d1..04fa114d 100644 --- a/monoio/src/net/unix/listener.rs +++ b/monoio/src/net/unix/listener.rs @@ -53,7 +53,7 @@ impl UnixListener { sys_listener.bind(&addr)?; sys_listener.listen(config.backlog)?; - let fd = SharedFd::new(sys_listener.into_raw_fd())?; + let fd = SharedFd::new::(sys_listener.into_raw_fd())?; Ok(Self::from_shared_fd(fd)) } @@ -75,7 +75,7 @@ impl UnixListener { let fd = completion.meta.result?; // Construct stream - let stream = UnixStream::from_shared_fd(SharedFd::new(fd as _)?); + let stream = UnixStream::from_shared_fd(SharedFd::new::(fd as _)?); // Construct SocketAddr let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; @@ -105,7 +105,7 @@ impl UnixListener { let fd = completion.meta.result?; // Construct stream - let stream = UnixStream::from_shared_fd(SharedFd::new(fd as _)?); + let stream = UnixStream::from_shared_fd(SharedFd::new::(fd as _)?); // Construct SocketAddr let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; @@ -135,7 +135,7 @@ impl UnixListener { /// Creates new `UnixListener` from a `std::os::unix::net::UnixListener`. pub fn from_std(sys_listener: std::os::unix::net::UnixListener) -> io::Result { - match SharedFd::new(sys_listener.as_raw_fd()) { + match SharedFd::new::(sys_listener.as_raw_fd()) { Ok(shared) => Ok(Self { fd: shared, sys_listener: Some(sys_listener), diff --git a/monoio/src/net/unix/mod.rs b/monoio/src/net/unix/mod.rs index 82f2453a..fa081150 100644 --- a/monoio/src/net/unix/mod.rs +++ b/monoio/src/net/unix/mod.rs @@ -20,6 +20,9 @@ pub use socket_addr::SocketAddr; pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf}; pub use stream::UnixStream; +#[cfg(feature = "poll-io")] +pub mod stream_poll; + pub(crate) fn path_offset(sockaddr: &libc::sockaddr_un) -> usize { let base = sockaddr as *const _ as usize; let path = &sockaddr.sun_path as *const _ as usize; diff --git a/monoio/src/net/unix/seq_packet/listener.rs b/monoio/src/net/unix/seq_packet/listener.rs index 72feabcd..0fa3ca5c 100644 --- a/monoio/src/net/unix/seq_packet/listener.rs +++ b/monoio/src/net/unix/seq_packet/listener.rs @@ -29,7 +29,7 @@ impl UnixSeqpacketListener { crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?; crate::syscall!(listen(socket, backlog))?; Ok(Self { - fd: SharedFd::new(socket)?, + fd: SharedFd::new::(socket)?, }) } @@ -50,7 +50,7 @@ impl UnixSeqpacketListener { let fd = completion.meta.result?; // Construct stream - let stream = UnixSeqpacket::from_shared_fd(SharedFd::new(fd as _)?); + let stream = UnixSeqpacket::from_shared_fd(SharedFd::new::(fd as _)?); // Construct SocketAddr let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; diff --git a/monoio/src/net/unix/seq_packet/mod.rs b/monoio/src/net/unix/seq_packet/mod.rs index 1c48bbeb..74f23669 100644 --- a/monoio/src/net/unix/seq_packet/mod.rs +++ b/monoio/src/net/unix/seq_packet/mod.rs @@ -34,8 +34,8 @@ impl UnixSeqpacket { pub fn pair() -> io::Result<(Self, Self)> { let (a, b) = pair(libc::SOCK_SEQPACKET)?; Ok(( - Self::from_shared_fd(SharedFd::new(a)?), - Self::from_shared_fd(SharedFd::new(b)?), + Self::from_shared_fd(SharedFd::new::(a)?), + Self::from_shared_fd(SharedFd::new::(b)?), )) } @@ -57,7 +57,7 @@ impl UnixSeqpacket { socklen: libc::socklen_t, ) -> io::Result { let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?; - let op = Op::connect_unix(SharedFd::new(socket)?, sockaddr, socklen)?; + let op = Op::connect_unix(SharedFd::new::(socket)?, sockaddr, socklen)?; let completion = op.await; completion.meta.result?; diff --git a/monoio/src/net/unix/stream.rs b/monoio/src/net/unix/stream.rs index a4ac01a5..830ec6bc 100644 --- a/monoio/src/net/unix/stream.rs +++ b/monoio/src/net/unix/stream.rs @@ -23,7 +23,7 @@ use crate::{ /// UnixStream pub struct UnixStream { - fd: SharedFd, + pub(super) fd: SharedFd, } /// UnixStream is safe to split to two parts @@ -52,7 +52,7 @@ impl UnixStream { socklen: libc::socklen_t, ) -> io::Result { let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?; - let op = Op::connect_unix(SharedFd::new(socket)?, sockaddr, socklen)?; + let op = Op::connect_unix(SharedFd::new::(socket)?, sockaddr, socklen)?; let completion = op.await; completion.meta.result?; @@ -86,7 +86,7 @@ impl UnixStream { /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. pub fn from_std(stream: std::os::unix::net::UnixStream) -> io::Result { - match SharedFd::new(stream.as_raw_fd()) { + match SharedFd::new::(stream.as_raw_fd()) { Ok(shared) => { stream.into_raw_fd(); Ok(Self::from_shared_fd(shared)) diff --git a/monoio/src/net/unix/stream_poll.rs b/monoio/src/net/unix/stream_poll.rs new file mode 100644 index 00000000..f3516dd5 --- /dev/null +++ b/monoio/src/net/unix/stream_poll.rs @@ -0,0 +1,146 @@ +//! This module provide a poll-io style interface for UnixStream. + +use std::{io, os::fd::AsRawFd}; + +use super::{SocketAddr, UnixStream}; +use crate::driver::op::Op; + +/// A UnixStream with poll-io style interface. +/// Using this struct, you can use UnixStream in a poll-like way. +/// Underlying, it is based on a uring-based epoll. +#[derive(Debug)] +pub struct UnixStreamPoll(UnixStream); + +impl crate::io::IntoPollIo for UnixStream { + type PollIo = UnixStreamPoll; + + #[inline] + fn try_into_poll_io(self) -> Result { + self.try_into_poll_io() + } +} + +impl UnixStream { + /// Convert to poll-io style UnixStreamPoll + #[inline] + pub fn try_into_poll_io(mut self) -> Result { + match self.fd.cvt_poll() { + Ok(_) => Ok(UnixStreamPoll(self)), + Err(e) => Err((e, self)), + } + } +} + +impl crate::io::IntoCompIo for UnixStreamPoll { + type CompIo = UnixStream; + + #[inline] + fn try_into_comp_io(self) -> Result { + self.try_into_comp_io() + } +} + +impl UnixStreamPoll { + /// Convert to normal UnixStream + #[inline] + pub fn try_into_comp_io(mut self) -> Result { + match self.0.fd.cvt_comp() { + Ok(_) => Ok(self.0), + Err(e) => Err((e, self)), + } + } +} + +impl tokio::io::AsyncRead for UnixStreamPoll { + #[inline] + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + unsafe { + let slice = buf.unfilled_mut(); + let raw_buf = crate::buf::RawBuf::new(slice.as_ptr() as *const u8, slice.len()); + let mut recv = Op::recv_raw(&self.0.fd, raw_buf); + let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut recv, cx)); + + std::task::Poll::Ready(ret.result.map(|n| { + buf.assume_init(n as usize); + buf.advance(n as usize); + })) + } + } +} + +impl tokio::io::AsyncWrite for UnixStreamPoll { + #[inline] + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + unsafe { + let raw_buf = crate::buf::RawBuf::new(buf.as_ptr(), buf.len()); + let mut send = Op::send_raw(&self.0.fd, raw_buf); + let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut send, cx)); + + std::task::Poll::Ready(ret.result.map(|n| n as usize)) + } + } + + #[inline] + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + #[inline] + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let fd = self.0.as_raw_fd(); + let res = match unsafe { libc::shutdown(fd, libc::SHUT_WR) } { + -1 => Err(io::Error::last_os_error()), + _ => Ok(()), + }; + std::task::Poll::Ready(res) + } + + #[inline] + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + unsafe { + let raw_buf = + crate::buf::RawBufVectored::new(bufs.as_ptr() as *const libc::iovec, bufs.len()); + let mut writev = Op::writev_raw(&self.0.fd, raw_buf); + let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut writev, cx)); + + std::task::Poll::Ready(ret.result.map(|n| n as usize)) + } + } + + #[inline] + fn is_write_vectored(&self) -> bool { + true + } +} + +impl UnixStreamPoll { + /// Returns the socket address of the local half of this connection. + #[inline] + pub fn local_addr(&self) -> io::Result { + self.0.local_addr() + } + + /// Returns the socket address of the remote half of this connection. + #[inline] + pub fn peer_addr(&self) -> io::Result { + self.0.peer_addr() + } +}