-
Notifications
You must be signed in to change notification settings - Fork 226
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: support poll-io feature to run epoll over iouring
- Loading branch information
Showing
56 changed files
with
1,473 additions
and
546 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,5 @@ members = [ | |
|
||
# Internal | ||
"examples", | ||
"examples/tokio-io-compat", | ||
] | ||
resolver = "2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,50 @@ | ||
--- | ||
title: Compatible with Tokio ecology | ||
date: 2021-12-17 18:00:00 | ||
updated: 2024-01-30 15:00:00 | ||
author: ihciah | ||
--- | ||
|
||
# Compatible with Tokio ecology | ||
A large number of existing Rust components are compatible with Tokio, and they directly rely on Tokio's `AsyncRead` and `AsyncWrite` interfaces. | ||
# Compatible with the Tokio Ecosystem | ||
A large number of existing Rust components are compatible with Tokio and directly depend on Tokio's `AsyncRead` and `AsyncWrite` traits. | ||
|
||
In Monoio, since the bottom layer is an asynchronous system call, we chose a similar tokio-uring approach: providing an IO interface that transfers the ownership of the buffer. But at this stage, obviously there are not many available libraries that can work, so we need to quickly support functions with a certain performance sacrifice. | ||
In Monoio, due to the underlying asynchronous system calls, we chose an approach similar to tokio-uring: providing IO interfaces that transfer buffer ownership. However, at this stage, there are obviously not many libraries available that work with this, so we need some means to be compatible with existing interface components. | ||
|
||
## monoio-compat | ||
`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on the buffer ownership interface. | ||
Currently, there are 3 ways to achieve compatibility: | ||
|
||
## tokio-compat | ||
`tokio-compat` is a feature of monoio. When the `iouring` is disabled and the `legacy` feature is enabled, after turning on the `tokio-compat` feature, `TcpStream`/`UnixStream` will implement `tokio::io::{AsyncRead, AsyncWrite}`. | ||
|
||
If you explicitly do not use iouring, then you can provide compatibility in this way. This form of compatibility has no overhead. If you might use iouring, then you should use the `poll-io` feature. | ||
|
||
### How it works | ||
For `poll_read`, the remaining capacity of the slice passed by the user will be read first, and then the buffer held by the user will be limited to this capacity and a future will be generated. After that, every time the user `poll_read` again, it will be forwarded to the `poll` method of this future. When returning to `Ready`, the contents of the buffer are copied to the slice passed by the user. | ||
## poll-io | ||
`poll-io` is a feature of monoio. After enabling this feature: | ||
1. `tokio::io` will be re-exported to `monoio::io::poll_io` | ||
2. `TcpStream`/`UnixStream` can be converted to and from `TcpStreamPoll`/`UnixStreamPoll` | ||
3. `TcpStreamPoll`/`UnixStreamPoll` implements `tokio::io::{AsyncRead, AsyncWrite}` | ||
|
||
For `poll_write`, the content of the slice passed by the user is copied to its own buffer first, then a future is generated and stored, and Ready is returned immediately. After that, every time the user `poll_write` again, it will first wait for the last content to be sent, then copy the data and return to Ready immediately. It behaves like BufWriter, and may causing errors to be delayed to be perceived. | ||
The underlying implementation of this feature runs epoll to sense fd readiness on top of iouring and directly initiates a syscall. Although this form of compatibility cannot effectively utilize iouring for asynchronous io, its performance is similar to other epoll+syscall implementations without additional copy overhead. | ||
|
||
## monoio-compat | ||
`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on an interface that transfers buffer ownership. | ||
|
||
At the cost, using this compatibility layer will cost you an extra buffer copy. | ||
### How It Works | ||
For `poll_read`, it first reads into the remaining capacity of the slice passed by the user, then restricts its own buffer to that capacity and generates a future. Afterwards, every time the user again calls `poll_read`, it will forward to the `poll` method of this future. When returning `Ready`, it copies the contents of the buffer into the user's passed slice. | ||
|
||
For `poll_write`, the content of the slice passed by the user is first copied to its own buffer, and then a future is generated and stored. After that, every time the user `poll_write` again, it will be forwarded to the `poll` method of this future. Return the result to the user when returning `Ready`. | ||
For `poll_write`, it first copies the contents of the user's passed slice into its own buffer, then generates a future and stores it, and immediately returns Ready. Thereafter, every time the user again calls `poll_write`, it will first wait for the content of the last write to be fully sent before copying data and immediately returning Ready. This behavior is similar to that of a BufWriter, which can lead to delayed error detection. | ||
|
||
In other words, using this compatibility layer will cost you an extra buffer copy overhead. | ||
The cost of using this compatibility layer is an additional buffer copy overhead. | ||
|
||
### Usage restrictions | ||
For write operations, users need to manually call poll_flush or poll_shutdown of AsyncWrite at the end (or the corresponding flush or shutdown methods of AsyncWriteExt), otherwise the data may not be submitted to the kernel (continuous writes do not require manual flushing). | ||
### Usage Restrictions | ||
For write operations, users need to manually call AsyncWrite's poll_flush or poll_shutdown (or the corresponding flush or shutdown methods in AsyncWriteExt) at the end; otherwise, data may not be submitted to the kernel (continuous writes do not require manual flushing). | ||
|
||
## Poll-oriented interface and asynchronous system call | ||
There are two ways to express asynchrony in Rust, one is based on `poll` and the other is based on `async`. `poll` is synchronous, semantically expressing an immediate attempt; while `async` is essentially the syntactic sugar of poll, it will swallow the future and generate a state machine, which is executed in a loop during await. | ||
## Poll-Based Interfaces and Asynchronous System Calls | ||
There are two ways to express asynchrony in Rust: one is based on `poll`, and the other is based on `async`. `Poll` is synchronous and semantically expresses the trying immediately; while `async` is essentially syntactic sugar for poll which encapsulates the future and generates a state machine, executing this state machine in a loop when awaiting. | ||
|
||
In Tokio, there are methods similar to `poll_read` and `poll_write`, both of which express the semantics of synchronous attempts. | ||
In Tokio, there are methods like `poll_read` and `poll_write` that both express the semantic of synchronous trying. | ||
|
||
When it returns to `Pending`, it means that IO is not ready (and the waker of cx is registered for notification), and when it returns to `Ready` it means that IO has been completed. It is easy to implement these two interfaces based on synchronous system calls. Directly make the corresponding system calls and determine the return result. If the IO is not ready, it will be suspended to Reactor. | ||
When `Pending` is returned, it implies that the IO is not ready (and registers a notice with the waker in cx), and when `Ready` is returned, it means the IO has completed. It is easy to implement these two interfaces based on synchronous system calls, by directly making the corresponding system call and judging the return result, and if the IO is not ready, suspend to the Reactor. | ||
|
||
However, these two interfaces are difficult to implement under asynchronous system calls. If we have pushed an Op into the io_uring SQ, the state of this syscall is uncertain before the corresponding CQE is consumed. We have no way to provide clear completed or unfinished semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities has led to our use restrictions. Under asynchronous system calls, it is more appropriate to pass the ownership of the buffer and cooperate with `async+await`. | ||
However, these two interfaces are difficult to implement under asynchronous system calls. If we have already pushed an Op into the io_uring SQ, then the status of that syscall is uncertain until we consume the corresponding CQE. We cannot provide a clear completed or not completed semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities leads to our usage restrictions. Under asynchronous system calls, transferring buffer ownership in combination with `async+await` is more appropriate. | ||
|
||
At present, the Rust standard library does not have a interface for asynchronous system calls, and there is no related component ecology. We are working hard to solve this problem. | ||
Currently, Rust's standard library does not have a universal interface oriented towards asynchronous system calls, and neither does the related component ecosystem. We are working hard to improve this problem. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
//! An echo example. | ||
//! | ||
//! Run the example and `nc 127.0.0.1 50002` in another shell. | ||
//! All your input will be echoed out. | ||
use monoio::{ | ||
io::{ | ||
poll_io::{AsyncReadExt, AsyncWriteExt}, | ||
IntoPollIo, | ||
}, | ||
net::{TcpListener, TcpStream}, | ||
}; | ||
|
||
#[monoio::main(driver = "fusion")] | ||
async fn main() { | ||
let listener = TcpListener::bind("127.0.0.1:50002").unwrap(); | ||
println!("listening"); | ||
loop { | ||
let incoming = listener.accept().await; | ||
match incoming { | ||
Ok((stream, addr)) => { | ||
println!("accepted a connection from {addr}"); | ||
monoio::spawn(echo(stream)); | ||
} | ||
Err(e) => { | ||
println!("accepted connection failed: {e}"); | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
|
||
async fn echo(stream: TcpStream) -> std::io::Result<()> { | ||
// Convert completion-based io to poll-based io(which impl tokio::io) | ||
let mut stream = stream.into_poll_io()?; | ||
let mut buf: Vec<u8> = vec![0; 1024]; | ||
let mut res; | ||
loop { | ||
// read | ||
res = stream.read(&mut buf).await?; | ||
if res == 0 { | ||
return Ok(()); | ||
} | ||
|
||
// write all | ||
stream.write_all(&buf[0..res]).await?; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,127 +1,60 @@ | ||
//! HTTP client example with hyper in compatible mode. | ||
//! HTTP client example with hyper in poll-io mode. | ||
//! | ||
//! It will try to fetch http://127.0.0.1:23300/monoio and print the | ||
//! It will try to fetch http://httpbin.org/ip and print the | ||
//! response. | ||
//! | ||
//! Note: | ||
//! It is not recommended to use this example as a production code. | ||
//! The `hyper` require `Send` for a future and obviously the future | ||
//! is not `Send` in monoio. So we just use some unsafe code to let | ||
//! it pass which infact not a good solution but the only way to | ||
//! make it work without modifying hyper. | ||
use std::{future::Future, pin::Pin}; | ||
use std::io::Write; | ||
|
||
use monoio_compat::TcpStreamCompat; | ||
use bytes::Bytes; | ||
use http_body_util::{BodyExt, Empty}; | ||
use hyper::Request; | ||
use monoio::{io::IntoPollIo, net::TcpStream}; | ||
use monoio_compat::hyper::MonoioIo; | ||
|
||
#[derive(Clone)] | ||
struct HyperExecutor; | ||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; | ||
|
||
impl<F> hyper::rt::Executor<F> for HyperExecutor | ||
where | ||
F: Future + 'static, | ||
F::Output: 'static, | ||
{ | ||
fn execute(&self, fut: F) { | ||
monoio::spawn(fut); | ||
} | ||
} | ||
async fn fetch_url(url: hyper::Uri) -> Result<()> { | ||
let host = url.host().expect("uri has no host"); | ||
let port = url.port_u16().unwrap_or(80); | ||
let addr = format!("{}:{}", host, port); | ||
let stream = TcpStream::connect(addr).await?.into_poll_io()?; | ||
let io = MonoioIo::new(stream); | ||
|
||
#[derive(Clone)] | ||
struct HyperConnector; | ||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; | ||
monoio::spawn(async move { | ||
if let Err(err) = conn.await { | ||
println!("Connection failed: {:?}", err); | ||
} | ||
}); | ||
|
||
impl tower_service::Service<hyper::Uri> for HyperConnector { | ||
type Response = HyperConnection; | ||
let authority = url.authority().unwrap().clone(); | ||
|
||
type Error = std::io::Error; | ||
let path = url.path(); | ||
let req = Request::builder() | ||
.uri(path) | ||
.header(hyper::header::HOST, authority.as_str()) | ||
.body(Empty::<Bytes>::new())?; | ||
|
||
#[allow(clippy::type_complexity)] | ||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; | ||
|
||
fn poll_ready( | ||
&mut self, | ||
_: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Result<(), Self::Error>> { | ||
std::task::Poll::Ready(Ok(())) | ||
} | ||
let mut res = sender.send_request(req).await?; | ||
|
||
fn call(&mut self, uri: hyper::Uri) -> Self::Future { | ||
let host = uri.host().unwrap(); | ||
let port = uri.port_u16().unwrap_or(80); | ||
let address = format!("{host}:{port}"); | ||
println!("Response: {}", res.status()); | ||
println!("Headers: {:#?}\n", res.headers()); | ||
|
||
#[allow(clippy::type_complexity)] | ||
let b: Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>> = | ||
Box::pin(async move { | ||
let conn = monoio::net::TcpStream::connect(address).await?; | ||
let hyper_conn = HyperConnection(TcpStreamCompat::new(conn)); | ||
Ok(hyper_conn) | ||
}); | ||
unsafe { std::mem::transmute(b) } | ||
// Stream the body, writing each chunk to stdout as we get it | ||
// (instead of buffering and printing at the end). | ||
while let Some(next) = res.frame().await { | ||
let frame = next?; | ||
if let Some(chunk) = frame.data_ref() { | ||
std::io::stdout().write_all(chunk)?; | ||
} | ||
} | ||
} | ||
|
||
struct HyperConnection(TcpStreamCompat); | ||
println!("\n\nDone!"); | ||
|
||
impl tokio::io::AsyncRead for HyperConnection { | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
buf: &mut tokio::io::ReadBuf<'_>, | ||
) -> std::task::Poll<std::io::Result<()>> { | ||
Pin::new(&mut self.0).poll_read(cx, buf) | ||
} | ||
Ok(()) | ||
} | ||
|
||
impl tokio::io::AsyncWrite for HyperConnection { | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
buf: &[u8], | ||
) -> std::task::Poll<Result<usize, std::io::Error>> { | ||
Pin::new(&mut self.0).poll_write(cx, buf) | ||
} | ||
|
||
fn poll_flush( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Result<(), std::io::Error>> { | ||
Pin::new(&mut self.0).poll_flush(cx) | ||
} | ||
|
||
fn poll_shutdown( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Result<(), std::io::Error>> { | ||
Pin::new(&mut self.0).poll_shutdown(cx) | ||
} | ||
} | ||
|
||
impl hyper::client::connect::Connection for HyperConnection { | ||
fn connected(&self) -> hyper::client::connect::Connected { | ||
hyper::client::connect::Connected::new() | ||
} | ||
} | ||
|
||
#[allow(clippy::non_send_fields_in_send_ty)] | ||
unsafe impl Send for HyperConnection {} | ||
|
||
#[monoio::main] | ||
async fn main() { | ||
println!("Running http client"); | ||
let connector = HyperConnector; | ||
let client = hyper::Client::builder() | ||
.executor(HyperExecutor) | ||
.build::<HyperConnector, hyper::Body>(connector); | ||
let res = client | ||
.get("http://127.0.0.1:23300/monoio".parse().unwrap()) | ||
.await | ||
.expect("failed to fetch"); | ||
println!("Response status: {}", res.status()); | ||
let body = hyper::body::to_bytes(res.into_body()) | ||
.await | ||
.expect("failed to read body"); | ||
let body = | ||
String::from_utf8(body.into_iter().collect()).expect("failed to convert body to string"); | ||
println!("Response body: {body}"); | ||
let url = "http://httpbin.org/ip".parse::<hyper::Uri>().unwrap(); | ||
fetch_url(url).await.unwrap(); | ||
} |
Oops, something went wrong.