From 3bcabef55e8c6891a6e01bd8c33455d258b7f0cf Mon Sep 17 00:00:00 2001 From: Oliver Bunting <72926894+ollie-etl@users.noreply.github.com> Date: Wed, 14 Feb 2024 12:40:18 +0000 Subject: [PATCH] Address feedback: Add and use WithBufer trait --- src/fs/file.rs | 2 +- src/io/read.rs | 6 +-- src/io/readv.rs | 6 +-- src/io/recv_from.rs | 6 +-- src/io/recvmsg.rs | 6 +-- src/io/send_to.rs | 11 +---- src/io/send_zc.rs | 12 ++---- src/io/write.rs | 6 +-- src/io/write_fixed.rs | 11 +---- src/io/writev.rs | 11 +---- src/lib.rs | 82 +---------------------------------- src/types.rs | 99 +++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 122 insertions(+), 136 deletions(-) create mode 100644 src/types.rs diff --git a/src/fs/file.rs b/src/fs/file.rs index 9c3944cf..ebf061ad 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,7 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::sealed::MapResultBuf; +use crate::sealed::MapResult; use crate::{UnsubmittedOneshot, UnsubmittedWrite}; use std::fmt; use std::io; diff --git a/src/io/read.rs b/src/io/read.rs index 37b11a3a..d87af8c1 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,5 +1,6 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; +use crate::sealed::WithBuffer; use crate::Result; use crate::runtime::driver::op::{Completable, CqeResult, Op}; @@ -59,9 +60,6 @@ where } } - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + res.with_buffer(buf) } } diff --git a/src/io/readv.rs b/src/io/readv.rs index 859f9c42..44843a21 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -1,4 +1,5 @@ use crate::buf::BoundedBufMut; +use crate::sealed::WithBuffer; use crate::Result; use crate::io::SharedFd; @@ -87,9 +88,6 @@ where assert_eq!(count, 0); } - match res { - Ok(n) => Ok((n, bufs)), - Err(e) => Err(crate::Error(e, bufs)), - } + res.with_buffer(bufs) } } diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index c9336271..f7537801 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ @@ -78,9 +79,6 @@ where (n, socket_addr) }); - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + res.with_buffer(buf) } } diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 67175328..71dc8215 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ @@ -92,9 +93,6 @@ where (n, socket_addr) }); - match res { - Ok(n) => Ok((n, bufs)), - Err(e) => Err(crate::Error(e, bufs)), - } + res.with_buffer(bufs) } } diff --git a/src/io/send_to.rs b/src/io/send_to.rs index 805980ae..e41cd399 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -2,6 +2,7 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::Result; use socket2::SockAddr; use std::io::IoSlice; @@ -73,14 +74,6 @@ impl Completable for SendTo { type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; - - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result.map(|v| v as usize).with_buffer(self.buf) } } diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index 2602567b..838e1801 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, Result}; use std::io; @@ -42,14 +43,9 @@ impl Completable for SendZc { type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| self.bytes + v as usize); - // Recover the buffer - let buf = self.buf; - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result + .map(|v| self.bytes + v as usize) + .with_buffer(self.buf) } } diff --git a/src/io/write.rs b/src/io/write.rs index f903f4af..72bee1d8 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,3 +1,4 @@ +use crate::sealed::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot}; use io_uring::cqueue::Entry; use std::io; @@ -31,10 +32,7 @@ impl OneshotOutputTransform for WriteTransform { Err(io::Error::from_raw_os_error(-cqe.result())) }; - match res { - Ok(n) => Ok((n, data.buf)), - Err(e) => Err(crate::Error(e, data.buf)), - } + res.with_buffer(data.buf) } } diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index acf40f30..d99e74d0 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -2,6 +2,7 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; +use crate::sealed::WithBuffer; use crate::Result; use crate::runtime::CONTEXT; @@ -51,14 +52,6 @@ impl Completable for WriteFixed { type Output = Result; fn complete(self, cqe: op::CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; - - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result.map(|v| v as usize).with_buffer(self.buf) } } diff --git a/src/io/writev.rs b/src/io/writev.rs index 33b34abe..e73de206 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, Result}; use libc::iovec; use std::io; @@ -61,14 +62,6 @@ where type Output = Result>; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.bufs; - - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result.map(|v| v as usize).with_buffer(self.bufs) } } diff --git a/src/lib.rs b/src/lib.rs index a9f5f9c9..5405c9c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,7 @@ macro_rules! syscall { mod future; mod io; mod runtime; +mod types; pub mod buf; pub mod fs; @@ -81,9 +82,9 @@ pub use io::write::*; pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; pub use runtime::spawn; pub use runtime::Runtime; +pub use types::*; use crate::runtime::driver::op::Op; -use std::fmt::{Debug, Display}; use std::future::Future; /// Starts an `io_uring` enabled Tokio runtime. @@ -232,85 +233,6 @@ impl Builder { } } -/// A specialized `Result` type for `io-uring` operations with buffers. -/// -/// This type is used as a return value for asynchronous `io-uring` methods that -/// require passing ownership of a buffer to the runtime. When the operation -/// completes, the buffer is returned both in the success tuple and as part of the error. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::fs::File; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// // Open a file -/// let file = File::open("hello.txt").await?; -/// -/// let buf = vec![0; 4096]; -/// // Read some data, the buffer is passed by ownership and -/// // submitted to the kernel. When the operation completes, -/// // we get the buffer back. -/// let (n, buf) = file.read_at(buf, 0).await?; -/// -/// // Display the contents -/// println!("{:?}", &buf[..n]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -pub type Result = std::result::Result<(T, B), Error>; - -/// A specialized `Error` type for `io-uring` operations with buffers. -pub struct Error(pub std::io::Error, pub B); -impl Debug for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Debug::fmt(&self.0, f) - } -} - -impl Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.0, f) - } -} - -impl std::error::Error for Error { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&self.0) - } -} - -impl Error { - /// Applies a function to the contained buffer, returning a new `BufError`. - pub fn map(self, f: F) -> Error - where - F: FnOnce(B) -> U, - { - Error(self.0, f(self.1)) - } -} - -mod sealed { - /// A Specialized trait for mapping over the buffer in both sides of a Result - pub trait MapResultBuf { - type Output; - fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; - } -} - -impl sealed::MapResultBuf for Result { - type Output = Result; - fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { - match self { - Ok((r, b)) => Ok((r, f(b))), - Err(e) => Err(e.map(f)), - } - } -} - /// The simplest possible operation. Just posts a completion event, nothing else. /// /// This has a place in benchmarking and sanity checking uring. diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 00000000..c063eddd --- /dev/null +++ b/src/types.rs @@ -0,0 +1,99 @@ +use std::fmt::{Debug, Display}; + +/// A specialized `Result` type for `io-uring` operations with buffers. +/// +/// This type is used as a return value for asynchronous `io-uring` methods that +/// require passing ownership of a buffer to the runtime. When the operation +/// completes, the buffer is returned both in the success tuple and as part of the error. +/// +/// # Examples +/// +/// ```no_run +/// use tokio_uring::fs::File; +/// +/// fn main() -> Result<(), Box> { +/// tokio_uring::start(async { +/// // Open a file +/// let file = File::open("hello.txt").await?; +/// +/// let buf = vec![0; 4096]; +/// // Read some data, the buffer is passed by ownership and +/// // submitted to the kernel. When the operation completes, +/// // we get the buffer back. +/// let (n, buf) = file.read_at(buf, 0).await?; +/// +/// // Display the contents +/// println!("{:?}", &buf[..n]); +/// +/// Ok(()) +/// }) +/// } +/// ``` +pub type Result = std::result::Result<(T, B), Error>; + +/// A specialized `Error` type for `io-uring` operations with buffers. +pub struct Error(pub std::io::Error, pub B); +impl Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self.0, f) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.0) + } +} + +impl Error { + /// Applies a function to the contained buffer, returning a new `BufError`. + pub fn map(self, f: F) -> Error + where + F: FnOnce(B) -> U, + { + Error(self.0, f(self.1)) + } +} + +pub(super) mod sealed { + /// A Specialized trait for mapping over the buffer in both sides of a Result + pub trait MapResult { + type Output; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; + } + + /// Adapter trait to convert result::Result to crate::Result where E can be + /// converted to std::io::Error. + pub trait WithBuffer: Sized { + fn with_buffer(self, buf: B) -> T; + } +} + +impl sealed::MapResult for Result { + type Output = Result; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { + match self { + Ok((r, b)) => Ok((r, f(b))), + Err(e) => Err(e.map(f)), + } + } +} + +/// Adaptor implementation for Result to Result. +impl sealed::WithBuffer, B> for std::result::Result +where + E: Into, +{ + fn with_buffer(self, buf: B) -> Result { + match self { + Ok(res) => Ok((res, buf)), + Err(e) => Err(crate::Error(e.into(), buf)), + } + } +}