Skip to content

Commit

Permalink
Address feedback: Add and use WithBufer trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ollie-etl committed Feb 14, 2024
1 parent ed5faa9 commit 3bcabef
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 136 deletions.
2 changes: 1 addition & 1 deletion src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::sealed::MapResultBuf;
use crate::sealed::MapResult;
use crate::{UnsubmittedOneshot, UnsubmittedWrite};
use std::fmt;
use std::io;
Expand Down
6 changes: 2 additions & 4 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::sealed::WithBuffer;
use crate::Result;

use crate::runtime::driver::op::{Completable, CqeResult, Op};
Expand Down Expand Up @@ -59,9 +60,6 @@ where
}
}

match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(crate::Error(e, buf)),
}
res.with_buffer(buf)
}
}
6 changes: 2 additions & 4 deletions src/io/readv.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::buf::BoundedBufMut;
use crate::sealed::WithBuffer;
use crate::Result;

use crate::io::SharedFd;
Expand Down Expand Up @@ -87,9 +88,6 @@ where
assert_eq!(count, 0);
}

match res {
Ok(n) => Ok((n, bufs)),
Err(e) => Err(crate::Error(e, bufs)),
}
res.with_buffer(bufs)
}
}
6 changes: 2 additions & 4 deletions src/io/recv_from.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use crate::sealed::WithBuffer;
use crate::{buf::BoundedBufMut, io::SharedFd, Result};
use socket2::SockAddr;
use std::{
Expand Down Expand Up @@ -78,9 +79,6 @@ where
(n, socket_addr)
});

match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(crate::Error(e, buf)),
}
res.with_buffer(buf)
}
}
6 changes: 2 additions & 4 deletions src/io/recvmsg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use crate::sealed::WithBuffer;
use crate::{buf::BoundedBufMut, io::SharedFd, Result};
use socket2::SockAddr;
use std::{
Expand Down Expand Up @@ -92,9 +93,6 @@ where
(n, socket_addr)
});

match res {
Ok(n) => Ok((n, bufs)),
Err(e) => Err(crate::Error(e, bufs)),
}
res.with_buffer(bufs)
}
}
11 changes: 2 additions & 9 deletions src/io/send_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::buf::BoundedBuf;
use crate::io::SharedFd;
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use crate::sealed::WithBuffer;
use crate::Result;
use socket2::SockAddr;
use std::io::IoSlice;
Expand Down Expand Up @@ -73,14 +74,6 @@ impl<T> Completable for SendTo<T> {
type Output = Result<usize, T>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let buf = self.buf;

match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(crate::Error(e, buf)),
}
cqe.result.map(|v| v as usize).with_buffer(self.buf)
}
}
12 changes: 4 additions & 8 deletions src/io/send_zc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable};
use crate::runtime::CONTEXT;
use crate::sealed::WithBuffer;
use crate::{buf::BoundedBuf, io::SharedFd, Result};
use std::io;

Expand Down Expand Up @@ -42,14 +43,9 @@ impl<T> Completable for SendZc<T> {
type Output = Result<usize, T>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| self.bytes + v as usize);
// Recover the buffer
let buf = self.buf;
match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(crate::Error(e, buf)),
}
cqe.result
.map(|v| self.bytes + v as usize)
.with_buffer(self.buf)
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/io/write.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::sealed::WithBuffer;
use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot};
use io_uring::cqueue::Entry;
use std::io;
Expand Down Expand Up @@ -31,10 +32,7 @@ impl<T> OneshotOutputTransform for WriteTransform<T> {
Err(io::Error::from_raw_os_error(-cqe.result()))
};

match res {
Ok(n) => Ok((n, data.buf)),
Err(e) => Err(crate::Error(e, data.buf)),
}
res.with_buffer(data.buf)
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/io/write_fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::buf::fixed::FixedBuf;
use crate::buf::BoundedBuf;
use crate::io::SharedFd;
use crate::runtime::driver::op::{self, Completable, Op};
use crate::sealed::WithBuffer;
use crate::Result;

use crate::runtime::CONTEXT;
Expand Down Expand Up @@ -51,14 +52,6 @@ impl<T> Completable for WriteFixed<T> {
type Output = Result<usize, T>;

fn complete(self, cqe: op::CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let buf = self.buf;

match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(crate::Error(e, buf)),
}
cqe.result.map(|v| v as usize).with_buffer(self.buf)
}
}
11 changes: 2 additions & 9 deletions src/io/writev.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use crate::sealed::WithBuffer;
use crate::{buf::BoundedBuf, io::SharedFd, Result};
use libc::iovec;
use std::io;
Expand Down Expand Up @@ -61,14 +62,6 @@ where
type Output = Result<usize, Vec<T>>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let buf = self.bufs;

match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(crate::Error(e, buf)),
}
cqe.result.map(|v| v as usize).with_buffer(self.bufs)
}
}
82 changes: 2 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ macro_rules! syscall {
mod future;
mod io;
mod runtime;
mod types;

pub mod buf;
pub mod fs;
Expand All @@ -81,9 +82,9 @@ pub use io::write::*;
pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot};
pub use runtime::spawn;
pub use runtime::Runtime;
pub use types::*;

use crate::runtime::driver::op::Op;
use std::fmt::{Debug, Display};
use std::future::Future;

/// Starts an `io_uring` enabled Tokio runtime.
Expand Down Expand Up @@ -232,85 +233,6 @@ impl Builder {
}
}

/// A specialized `Result` type for `io-uring` operations with buffers.
///
/// This type is used as a return value for asynchronous `io-uring` methods that
/// require passing ownership of a buffer to the runtime. When the operation
/// completes, the buffer is returned both in the success tuple and as part of the error.
///
/// # Examples
///
/// ```no_run
/// use tokio_uring::fs::File;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
/// // Open a file
/// let file = File::open("hello.txt").await?;
///
/// let buf = vec![0; 4096];
/// // Read some data, the buffer is passed by ownership and
/// // submitted to the kernel. When the operation completes,
/// // we get the buffer back.
/// let (n, buf) = file.read_at(buf, 0).await?;
///
/// // Display the contents
/// println!("{:?}", &buf[..n]);
///
/// Ok(())
/// })
/// }
/// ```
pub type Result<T, B> = std::result::Result<(T, B), Error<B>>;

/// A specialized `Error` type for `io-uring` operations with buffers.
pub struct Error<B>(pub std::io::Error, pub B);
impl<T> Debug for Error<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.0, f)
}
}

impl<T> Display for Error<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}

impl<T> std::error::Error for Error<T> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}

impl<B> Error<B> {
/// Applies a function to the contained buffer, returning a new `BufError`.
pub fn map<F, U>(self, f: F) -> Error<U>
where
F: FnOnce(B) -> U,
{
Error(self.0, f(self.1))
}
}

mod sealed {
/// A Specialized trait for mapping over the buffer in both sides of a Result<T,B>
pub trait MapResultBuf<B, U> {
type Output;
fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output;
}
}

impl<T, B, U> sealed::MapResultBuf<B, U> for Result<T, B> {
type Output = Result<T, U>;
fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output {
match self {
Ok((r, b)) => Ok((r, f(b))),
Err(e) => Err(e.map(f)),
}
}
}

/// The simplest possible operation. Just posts a completion event, nothing else.
///
/// This has a place in benchmarking and sanity checking uring.
Expand Down
99 changes: 99 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::fmt::{Debug, Display};

/// A specialized `Result` type for `io-uring` operations with buffers.
///
/// This type is used as a return value for asynchronous `io-uring` methods that
/// require passing ownership of a buffer to the runtime. When the operation
/// completes, the buffer is returned both in the success tuple and as part of the error.
///
/// # Examples
///
/// ```no_run
/// use tokio_uring::fs::File;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
/// // Open a file
/// let file = File::open("hello.txt").await?;
///
/// let buf = vec![0; 4096];
/// // Read some data, the buffer is passed by ownership and
/// // submitted to the kernel. When the operation completes,
/// // we get the buffer back.
/// let (n, buf) = file.read_at(buf, 0).await?;
///
/// // Display the contents
/// println!("{:?}", &buf[..n]);
///
/// Ok(())
/// })
/// }
/// ```
pub type Result<T, B> = std::result::Result<(T, B), Error<B>>;

/// A specialized `Error` type for `io-uring` operations with buffers.
pub struct Error<B>(pub std::io::Error, pub B);
impl<T> Debug for Error<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.0, f)
}
}

impl<T> Display for Error<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}

impl<T> std::error::Error for Error<T> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}

impl<B> Error<B> {
/// Applies a function to the contained buffer, returning a new `BufError`.
pub fn map<F, U>(self, f: F) -> Error<U>
where
F: FnOnce(B) -> U,
{
Error(self.0, f(self.1))
}
}

pub(super) mod sealed {
/// A Specialized trait for mapping over the buffer in both sides of a Result<T,B>
pub trait MapResult<B, U> {
type Output;
fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output;
}

/// Adapter trait to convert result::Result<T, E> to crate::Result<T, B> where E can be
/// converted to std::io::Error.
pub trait WithBuffer<T, B>: Sized {
fn with_buffer(self, buf: B) -> T;
}
}

impl<T, B, U> sealed::MapResult<B, U> for Result<T, B> {
type Output = Result<T, U>;
fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output {
match self {
Ok((r, b)) => Ok((r, f(b))),
Err(e) => Err(e.map(f)),
}
}
}

/// Adaptor implementation for Result<T, E> to Result<T, B>.
impl<T, B, E> sealed::WithBuffer<crate::Result<T, B>, B> for std::result::Result<T, E>
where
E: Into<std::io::Error>,
{
fn with_buffer(self, buf: B) -> Result<T, B> {
match self {
Ok(res) => Ok((res, buf)),
Err(e) => Err(crate::Error(e.into(), buf)),
}
}
}

0 comments on commit 3bcabef

Please sign in to comment.