diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b07fb910..26421d76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,13 +101,9 @@ jobs: - target: x86_64-pc-windows-msvc os: windows-latest - no_run: 1 - target: x86_64-pc-windows-gnu os: windows-latest - no_run: 1 - target: i686-pc-windows-msvc os: windows-latest - no_run: 1 - target: i686-pc-windows-gnu os: windows-latest - no_run: 1 diff --git a/examples/echo.rs b/examples/echo.rs index 7ffb4ec7..62677935 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -3,6 +3,7 @@ //! Run the example and `nc 127.0.0.1 50002` in another shell. //! All your input will be echoed out. +#[cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncWriteRentExt}, net::{TcpListener, TcpStream}, @@ -28,6 +29,7 @@ async fn main() { } } +#[cfg(unix)] async fn echo(mut stream: TcpStream) -> std::io::Result<()> { let mut buf: Vec = Vec::with_capacity(8 * 1024); let mut res; diff --git a/examples/echo_poll.rs b/examples/echo_poll.rs index cd1be5c1..9e8e18e0 100644 --- a/examples/echo_poll.rs +++ b/examples/echo_poll.rs @@ -2,7 +2,7 @@ //! //! Run the example and `nc 127.0.0.1 50002` in another shell. //! All your input will be echoed out. - +#[cfg(unix)] use monoio::{ io::{ poll_io::{AsyncReadExt, AsyncWriteExt}, @@ -30,6 +30,7 @@ async fn main() { } } +#[cfg(unix)] async fn echo(stream: TcpStream) -> std::io::Result<()> { // Convert completion-based io to poll-based io(which impl tokio::io) let mut stream = stream.into_poll_io()?; diff --git a/examples/h2_server.rs b/examples/h2_server.rs index d44134b6..4d852bc0 100644 --- a/examples/h2_server.rs +++ b/examples/h2_server.rs @@ -1,11 +1,13 @@ //! Example for using h2 directly. //! Example code is modified from https://github.com/hyperium/h2/blob/master/examples/server.rs. +#[cfg(unix)] use monoio::{ io::IntoPollIo, net::{TcpListener, TcpStream}, }; +#[cfg(unix)] async fn serve(io: TcpStream) -> Result<(), Box> { let io = io.into_poll_io()?; let mut connection = h2::server::handshake(io).await?; @@ -24,6 +26,7 @@ async fn serve(io: TcpStream) -> Result<(), Box, mut respond: h2::server::SendResponse, diff --git a/examples/hyper_client.rs b/examples/hyper_client.rs index 4bb86539..b484ee3b 100644 --- a/examples/hyper_client.rs +++ b/examples/hyper_client.rs @@ -2,17 +2,20 @@ //! //! It will try to fetch http://httpbin.org/ip and print the //! response. - -use std::io::Write; - -use bytes::Bytes; -use http_body_util::{BodyExt, Empty}; -use hyper::Request; -use monoio::{io::IntoPollIo, net::TcpStream}; -use monoio_compat::hyper::MonoioIo; - +#[cfg(unix)] +use { + bytes::Bytes, + http_body_util::{BodyExt, Empty}, + hyper::Request, + monoio::{io::IntoPollIo, net::TcpStream}, + monoio_compat::hyper::MonoioIo, + std::io::Write, +}; + +#[cfg(unix)] type Result = std::result::Result>; +#[cfg(unix)] async fn fetch_url(url: hyper::Uri) -> Result<()> { let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index 40d11ed2..66549168 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -2,20 +2,22 @@ //! //! After running this example, you can open http://localhost:23300 //! and http://localhost:23300/monoio in your browser or curl it. +#[cfg(unix)] +use { + bytes::Bytes, + futures::Future, + http_body_util::Full, + hyper::{server::conn::http1, service::service_fn, Method, Request, Response, StatusCode}, + monoio::{io::IntoPollIo, net::TcpListener}, +}; -use std::net::SocketAddr; - -use bytes::Bytes; -use futures::Future; -use hyper::{server::conn::http1, service::service_fn}; -use monoio::{io::IntoPollIo, net::TcpListener}; - +#[cfg(unix)] pub(crate) async fn serve_http(addr: A, service: S) -> std::io::Result<()> where S: Copy + Fn(Request) -> F + 'static, F: Future>, E>> + 'static, E: std::error::Error + 'static + Send + Sync, - A: Into, + A: Into, { let listener = TcpListener::bind(addr.into())?; loop { @@ -35,9 +37,7 @@ where } } -use http_body_util::Full; -use hyper::{Method, Request, Response, StatusCode}; - +#[cfg(unix)] async fn hyper_handler( req: Request, ) -> Result>, std::convert::Infallible> { diff --git a/examples/join.rs b/examples/join.rs index f55f79f7..f5ad53f6 100644 --- a/examples/join.rs +++ b/examples/join.rs @@ -13,6 +13,7 @@ async fn main() { println!("monoio::join two tasks"); } +#[cfg(unix)] async fn ready_now() -> u8 { 7 } diff --git a/examples/proxy.rs b/examples/proxy.rs index e8429d1e..c8471550 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -1,5 +1,5 @@ //! An example TCP proxy. - +#[cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt, Splitable}, net::{TcpListener, TcpStream}, @@ -35,6 +35,7 @@ async fn main() { } } +#[cfg(unix)] async fn copy_one_direction( mut from: FROM, to: &mut TO, diff --git a/examples/timer_select.rs b/examples/timer_select.rs index 47afd029..21ae7c08 100644 --- a/examples/timer_select.rs +++ b/examples/timer_select.rs @@ -1,9 +1,10 @@ //! An example to illustrate selecting by macro or manually. - -use std::{future::Future, pin::pin}; - -use monoio::{io::Canceller, net::TcpListener, time::Duration}; -use pin_project_lite::pin_project; +#[cfg(unix)] +use { + monoio::{io::Canceller, net::TcpListener, time::Duration}, + pin_project_lite::pin_project, + std::{future::Future, pin::pin}, +}; #[monoio::main(enable_timer = true)] async fn main() { @@ -65,6 +66,7 @@ async fn main() { } } +#[cfg(unix)] pin_project! { struct DualSelect { #[pin] @@ -74,6 +76,7 @@ pin_project! { } } +#[cfg(unix)] impl Future for DualSelect where F: Future, diff --git a/examples/uds.rs b/examples/uds.rs index 9db4edf6..883498ac 100644 --- a/examples/uds.rs +++ b/examples/uds.rs @@ -1,6 +1,6 @@ //! A example to show how to use UnixStream. - use local_sync::oneshot::channel; +#[cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncWriteRentExt}, net::{UnixListener, UnixStream}, diff --git a/monoio-macros/src/entry.rs b/monoio-macros/src/entry.rs index ed9d7a6a..934e5445 100644 --- a/monoio-macros/src/entry.rs +++ b/monoio-macros/src/entry.rs @@ -349,16 +349,19 @@ fn parse_knobs(mut input: syn::ItemFn, is_test: bool, config: FinalConfig) -> To } else { quote! {} }; + // todo + // disable some tests in windows, after the features are completed, it is necessary to rollback + // the code here let cfg_attr = if is_test { match config.driver { DriverType::Legacy => quote! { - #[cfg(feature = "legacy")] + #[cfg(all(feature = "legacy", not(windows)))] }, DriverType::Uring => quote! { #[cfg(all(target_os = "linux", feature = "iouring"))] }, DriverType::Fusion => quote! { - #[cfg(any(feature = "legacy", feature = "iouring"))] + #[cfg(all(any(feature = "legacy", feature = "iouring"), not(windows, feature = "sync")))] }, } } else { diff --git a/monoio-macros/src/lib.rs b/monoio-macros/src/lib.rs index 83d7477c..674d8a24 100644 --- a/monoio-macros/src/lib.rs +++ b/monoio-macros/src/lib.rs @@ -11,6 +11,7 @@ mod entry; mod select; use proc_macro::TokenStream; + #[cfg(unix)] #[proc_macro_attribute] pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { @@ -19,8 +20,26 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { #[cfg(windows)] #[proc_macro_attribute] -pub fn main(_args: TokenStream, _item: TokenStream) -> TokenStream { - unimplemented!() +pub fn main(_args: TokenStream, func: TokenStream) -> TokenStream { + use quote::quote; + use syn::parse_macro_input; + + let func = parse_macro_input!(func as syn::ItemFn); + let func_vis = &func.vis; // like pub + + let func_decl = func.sig; + let func_name = &func_decl.ident; // function name + let func_generics = &func_decl.generics; + let func_inputs = &func_decl.inputs; + let _func_output = &func_decl.output; + + let caller = quote! { + // rebuild the function + #func_vis fn #func_name #func_generics(#func_inputs) { + println!("macros unimplemented in windows!"); + } + }; + caller.into() } #[proc_macro_attribute] diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 6a5cce1c..af0af105 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -39,13 +39,14 @@ once_cell = { version = "1.19.0", optional = true } # windows dependencies(will be added when windows support finished) [target.'cfg(windows)'.dependencies] -windows-sys = { version = "0.48.0", features = [ +windows-sys = { version = "0.52.0", features = [ "Win32_Foundation", "Win32_Networking_WinSock", "Win32_System_IO", "Win32_Storage_FileSystem", "Win32_Security" ] } +polling = { version = "2.8.0", optional = true } # unix dependencies [target.'cfg(unix)'.dependencies] @@ -78,13 +79,13 @@ utils = ["nix"] # enable debug if you want to know what runtime does debug = ["tracing"] # enable legacy driver support(will make monoio available for older kernel and macOS) -legacy = ["mio"] +legacy = ["mio", "polling", "once_cell"] # iouring support iouring = ["io-uring"] # tokio-compatible(only have effect when legacy is enabled and iouring is not) tokio-compat = ["tokio"] # (experimental)enable poll-io to convert structs to structs that impl tokio's poll io -poll-io = ["tokio", "mio"] +poll-io = ["tokio", "mio", "polling"] # signal enables setting ctrl_c handler signal = ["ctrlc", "sync"] signal-termination = ["signal", "ctrlc/termination"] diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index d28d0dbf..47b9da57 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -187,7 +187,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, not(windows)))] mod tests { use super::DefaultThreadPool; diff --git a/monoio/src/buf/mod.rs b/monoio/src/buf/mod.rs index 570aba43..a30825a5 100644 --- a/monoio/src/buf/mod.rs +++ b/monoio/src/buf/mod.rs @@ -19,7 +19,11 @@ mod raw_buf; pub use raw_buf::{RawBuf, RawBufVectored}; mod vec_wrapper; -pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta}; +#[allow(unused_imports)] +pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta, IoVecMeta}; + +mod msg; +pub use msg::{MsgBuf, MsgBufMut, MsgMeta}; pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { // Safety: the `IoBuf` trait is marked as unsafe and is expected to be diff --git a/monoio/src/buf/msg.rs b/monoio/src/buf/msg.rs new file mode 100644 index 00000000..d95ad3c4 --- /dev/null +++ b/monoio/src/buf/msg.rs @@ -0,0 +1,124 @@ +use std::ops::{Deref, DerefMut}; + +#[cfg(unix)] +use libc::msghdr; +#[cfg(windows)] +use windows_sys::Win32::Networking::WinSock::WSAMSG; + +/// An `io_uring` compatible msg buffer. +/// +/// # Safety +/// See the safety note of the methods. +#[allow(clippy::unnecessary_safety_doc)] +pub unsafe trait MsgBuf: Unpin + 'static { + /// Returns a raw pointer to msghdr struct. + /// + /// # Safety + /// The implementation must ensure that, while the runtime owns the value, + /// the pointer returned by `stable_mut_ptr` **does not** change. + /// Also, the value pointed must be a valid msghdr struct. + #[cfg(unix)] + fn read_msghdr_ptr(&self) -> *const msghdr; + + /// Returns a raw pointer to WSAMSG struct. + #[cfg(windows)] + fn read_wsamsg_ptr(&self) -> *const WSAMSG; +} + +/// An `io_uring` compatible msg buffer. +/// +/// # Safety +/// See the safety note of the methods. +#[allow(clippy::unnecessary_safety_doc)] +pub unsafe trait MsgBufMut: Unpin + 'static { + /// Returns a raw pointer to msghdr struct. + /// + /// # Safety + /// The implementation must ensure that, while the runtime owns the value, + /// the pointer returned by `stable_mut_ptr` **does not** change. + /// Also, the value pointed must be a valid msghdr struct. + #[cfg(unix)] + fn write_msghdr_ptr(&mut self) -> *mut msghdr; + + /// Returns a raw pointer to WSAMSG struct. + #[cfg(windows)] + fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG; +} + +#[allow(missing_docs)] +pub struct MsgMeta { + #[cfg(unix)] + pub(crate) data: msghdr, + #[cfg(windows)] + pub(crate) data: WSAMSG, +} + +unsafe impl MsgBuf for MsgMeta { + #[cfg(unix)] + fn read_msghdr_ptr(&self) -> *const msghdr { + &self.data + } + + #[cfg(windows)] + fn read_wsamsg_ptr(&self) -> *const WSAMSG { + &self.data + } +} + +unsafe impl MsgBufMut for MsgMeta { + #[cfg(unix)] + fn write_msghdr_ptr(&mut self) -> *mut msghdr { + &mut self.data + } + + #[cfg(windows)] + fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG { + &mut self.data + } +} + +#[cfg(unix)] +impl From for MsgMeta { + fn from(data: msghdr) -> Self { + Self { data } + } +} + +#[cfg(unix)] +impl Deref for MsgMeta { + type Target = msghdr; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +#[cfg(unix)] +impl DerefMut for MsgMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +#[cfg(windows)] +impl From for MsgMeta { + fn from(data: WSAMSG) -> Self { + Self { data } + } +} + +#[cfg(windows)] +impl Deref for MsgMeta { + type Target = WSAMSG; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +#[cfg(windows)] +impl DerefMut for MsgMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs index 3da704b5..70a3ed5a 100644 --- a/monoio/src/buf/vec_wrapper.rs +++ b/monoio/src/buf/vec_wrapper.rs @@ -1,7 +1,7 @@ #[cfg(windows)] use {std::ops::Add, windows_sys::Win32::Networking::WinSock::WSABUF}; -use super::{IoVecBuf, IoVecBufMut}; +use super::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}; pub(crate) struct IoVecMeta { #[cfg(unix)] @@ -206,6 +206,44 @@ unsafe impl IoVecBufMut for IoVecMeta { } } +impl<'t, T: IoBuf> From<&'t T> for IoVecMeta { + fn from(buf: &'t T) -> Self { + let ptr = buf.read_ptr() as *const _ as *mut _; + let len = buf.bytes_init() as _; + #[cfg(unix)] + let item = libc::iovec { + iov_base: ptr, + iov_len: len, + }; + #[cfg(windows)] + let item = WSABUF { buf: ptr, len }; + Self { + data: vec![item], + offset: 0, + len: 1, + } + } +} + +impl<'t, T: IoBufMut> From<&'t mut T> for IoVecMeta { + fn from(buf: &'t mut T) -> Self { + let ptr = buf.write_ptr() as *mut _; + let len = buf.bytes_total() as _; + #[cfg(unix)] + let item = libc::iovec { + iov_base: ptr, + iov_len: len, + }; + #[cfg(windows)] + let item = WSABUF { buf: ptr, len }; + Self { + data: vec![item], + offset: 0, + len: 1, + } + } +} + #[cfg(unix)] #[cfg(test)] mod tests { diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index 4734c998..b6a7c9f0 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -2,9 +2,9 @@ use std::{io, marker::PhantomData}; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::driver::IoUringDriver; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] use crate::driver::LegacyDriver; -#[cfg(all(unix, any(feature = "legacy", feature = "iouring")))] +#[cfg(any(feature = "legacy", feature = "iouring"))] use crate::utils::thread_id::gen_id; use crate::{ driver::Driver, @@ -80,14 +80,14 @@ macro_rules! direct_build { direct_build!(IoUringDriver); #[cfg(all(target_os = "linux", feature = "iouring"))] direct_build!(TimeDriver); -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] direct_build!(LegacyDriver); -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] direct_build!(TimeDriver); // ===== builder impl ===== -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] impl Buildable for LegacyDriver { fn build(this: RuntimeBuilder) -> io::Result> { let thread_id = gen_id(); @@ -192,7 +192,7 @@ impl RuntimeBuilder { } /// Build the runtime. - #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] + #[cfg(not(all(target_os = "linux", feature = "iouring")))] pub fn build(self) -> io::Result> { let builder = RuntimeBuilder:: { entries: self.entries, @@ -248,7 +248,7 @@ impl RuntimeBuilder> { } /// Build the runtime. - #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] + #[cfg(not(all(target_os = "linux", feature = "iouring")))] pub fn build(self) -> io::Result>> { let builder = RuntimeBuilder::> { entries: self.entries, @@ -280,7 +280,7 @@ mod time_wrap { #[cfg(all(target_os = "linux", feature = "iouring"))] impl time_wrap::TimeWrapable for IoUringDriver {} -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] impl time_wrap::TimeWrapable for LegacyDriver {} #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] impl time_wrap::TimeWrapable for FusionDriver {} diff --git a/monoio/src/driver/legacy/iocp/afd.rs b/monoio/src/driver/legacy/iocp/afd.rs deleted file mode 100644 index 05d73056..00000000 --- a/monoio/src/driver/legacy/iocp/afd.rs +++ /dev/null @@ -1,201 +0,0 @@ -use std::{ - ffi::c_void, - fs::File, - os::windows::prelude::{AsRawHandle, FromRawHandle, RawHandle}, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use windows_sys::Win32::{ - Foundation::{ - RtlNtStatusToDosError, HANDLE, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_NOT_FOUND, - STATUS_PENDING, STATUS_SUCCESS, UNICODE_STRING, - }, - Storage::FileSystem::{ - NtCreateFile, SetFileCompletionNotificationModes, FILE_OPEN, FILE_SHARE_READ, - FILE_SHARE_WRITE, SYNCHRONIZE, - }, - System::WindowsProgramming::{ - NtDeviceIoControlFile, FILE_SKIP_SET_EVENT_ON_HANDLE, IO_STATUS_BLOCK, IO_STATUS_BLOCK_0, - OBJECT_ATTRIBUTES, - }, -}; - -use super::CompletionPort; - -#[link(name = "ntdll")] -extern "system" { - /// See - /// - /// This is an undocumented API and as such not part of - /// from which `windows-sys` is generated, and also unlikely to be added, so - /// we manually declare it here - fn NtCancelIoFileEx( - FileHandle: HANDLE, - IoRequestToCancel: *mut IO_STATUS_BLOCK, - IoStatusBlock: *mut IO_STATUS_BLOCK, - ) -> NTSTATUS; -} - -static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0); - -macro_rules! s { - ($($id:expr)+) => { - &[$($id as u16),+] - } -} - -pub const POLL_RECEIVE: u32 = 0b0_0000_0001; -pub const POLL_RECEIVE_EXPEDITED: u32 = 0b0_0000_0010; -pub const POLL_SEND: u32 = 0b0_0000_0100; -pub const POLL_DISCONNECT: u32 = 0b0_0000_1000; -pub const POLL_ABORT: u32 = 0b0_0001_0000; -pub const POLL_LOCAL_CLOSE: u32 = 0b0_0010_0000; -// Not used as it indicated in each event where a connection is connected, not -// just the first time a connection is established. -// Also see https://github.com/piscisaureus/wepoll/commit/8b7b340610f88af3d83f40fb728e7b850b090ece. -pub const POLL_CONNECT: u32 = 0b0_0100_0000; -pub const POLL_ACCEPT: u32 = 0b0_1000_0000; -pub const POLL_CONNECT_FAIL: u32 = 0b1_0000_0000; - -pub const KNOWN_EVENTS: u32 = POLL_RECEIVE - | POLL_RECEIVE_EXPEDITED - | POLL_SEND - | POLL_DISCONNECT - | POLL_ABORT - | POLL_LOCAL_CLOSE - | POLL_ACCEPT - | POLL_CONNECT_FAIL; - -#[repr(C)] -#[derive(Debug)] -pub struct AfdPollHandleInfo { - pub handle: HANDLE, - pub events: u32, - pub status: NTSTATUS, -} - -#[repr(C)] -#[derive(Debug)] -pub struct AfdPollInfo { - pub timeout: i64, - pub number_of_handles: u32, - pub exclusive: u32, - pub handles: [AfdPollHandleInfo; 1], -} - -#[derive(Debug)] -pub struct Afd { - file: File, -} - -impl Afd { - pub fn new(cp: &CompletionPort) -> std::io::Result { - const AFD_NAME: &[u16] = s!['\\' 'D' 'e' 'v' 'i' 'c' 'e' '\\' 'A' 'f' 'd' '\\' 'I' 'o']; - let mut device_name = UNICODE_STRING { - Length: std::mem::size_of_val(AFD_NAME) as u16, - MaximumLength: std::mem::size_of_val(AFD_NAME) as u16, - Buffer: AFD_NAME.as_ptr() as *mut u16, - }; - let mut device_attributes = OBJECT_ATTRIBUTES { - Length: std::mem::size_of::() as u32, - RootDirectory: 0, - ObjectName: &mut device_name, - Attributes: 0, - SecurityDescriptor: std::ptr::null_mut(), - SecurityQualityOfService: std::ptr::null_mut(), - }; - let mut handle = INVALID_HANDLE_VALUE; - let mut iosb = unsafe { std::mem::zeroed::() }; - let result = unsafe { - NtCreateFile( - &mut handle, - SYNCHRONIZE, - &mut device_attributes, - &mut iosb, - std::ptr::null_mut(), - 0, - FILE_SHARE_READ | FILE_SHARE_WRITE, - FILE_OPEN, - 0, - std::ptr::null_mut(), - 0, - ) - }; - - if result != STATUS_SUCCESS { - let error = unsafe { RtlNtStatusToDosError(result) }; - return Err(std::io::Error::from_raw_os_error(error as i32)); - } - - let file = unsafe { File::from_raw_handle(handle as RawHandle) }; - // Increment by 2 to reserve space for other types of handles. - // Non-AFD types (currently only NamedPipe), use odd numbered - // tokens. This allows the selector to differentiate between them - // and dispatch events accordingly. - let token = NEXT_TOKEN.fetch_add(2, Ordering::Relaxed) + 2; - cp.add_handle(token, file.as_raw_handle() as HANDLE)?; - let result = unsafe { - SetFileCompletionNotificationModes( - handle, - FILE_SKIP_SET_EVENT_ON_HANDLE as u8, // This is just 2, so fits in u8 - ) - }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(Self { file }) - } - } - - pub unsafe fn poll( - &self, - info: &mut AfdPollInfo, - iosb: *mut IO_STATUS_BLOCK, - overlapped: *mut c_void, - ) -> std::io::Result { - const IOCTL_AFD_POLL: u32 = 0x00012024; - let info_ptr = info as *mut _ as *mut c_void; - (*iosb).Anonymous.Status = STATUS_PENDING; - - let result = NtDeviceIoControlFile( - self.file.as_raw_handle() as HANDLE, - 0, - None, - overlapped, - iosb, - IOCTL_AFD_POLL, - info_ptr, - std::mem::size_of::() as u32, - info_ptr, - std::mem::size_of::() as u32, - ); - - match result { - STATUS_SUCCESS => Ok(true), - STATUS_PENDING => Ok(false), - status => { - let error = RtlNtStatusToDosError(status); - Err(std::io::Error::from_raw_os_error(error as i32)) - } - } - } - - pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> std::io::Result<()> { - if (*iosb).Anonymous.Status != STATUS_PENDING { - return Ok(()); - } - let mut cancel_iosb = IO_STATUS_BLOCK { - Anonymous: IO_STATUS_BLOCK_0 { Status: 0 }, - Information: 0, - }; - let status = NtCancelIoFileEx(self.file.as_raw_handle() as HANDLE, iosb, &mut cancel_iosb); - - if status == STATUS_SUCCESS || status == STATUS_NOT_FOUND { - Ok(()) - } else { - let error = RtlNtStatusToDosError(status); - Err(std::io::Error::from_raw_os_error(error as i32)) - } - } -} diff --git a/monoio/src/driver/legacy/iocp/core.rs b/monoio/src/driver/legacy/iocp/core.rs deleted file mode 100644 index d871fd10..00000000 --- a/monoio/src/driver/legacy/iocp/core.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::{ - os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}, - time::Duration, -}; - -use windows_sys::Win32::{ - Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}, - System::IO::{ - CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, - OVERLAPPED_ENTRY, - }, -}; - -#[derive(Debug)] -pub struct CompletionPort { - handle: HANDLE, -} - -impl CompletionPort { - pub fn new(value: u32) -> std::io::Result { - let handle = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, value) }; - - if handle == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(Self { handle }) - } - } - - pub fn add_handle(&self, token: usize, handle: HANDLE) -> std::io::Result<()> { - let result = unsafe { CreateIoCompletionPort(handle, self.handle, token, 0) }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) - } - } - - pub fn get_many<'a>( - &self, - entries: &'a mut [OVERLAPPED_ENTRY], - timeout: Option, - ) -> std::io::Result<&'a mut [OVERLAPPED_ENTRY]> { - let mut count = 0; - let result = unsafe { - GetQueuedCompletionStatusEx( - self.handle, - entries.as_mut_ptr(), - std::cmp::min(entries.len(), u32::max_value() as usize) as u32, - &mut count, - duration_millis(timeout), - 0, - ) - }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(&mut entries[..count as usize]) - } - } - - pub fn post(&self, entry: OVERLAPPED_ENTRY) -> std::io::Result<()> { - let result = unsafe { - PostQueuedCompletionStatus( - self.handle, - entry.dwNumberOfBytesTransferred, - entry.lpCompletionKey, - entry.lpOverlapped, - ) - }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) - } - } -} - -impl Drop for CompletionPort { - fn drop(&mut self) { - unsafe { CloseHandle(self.handle) }; - } -} - -impl AsRawHandle for CompletionPort { - fn as_raw_handle(&self) -> RawHandle { - self.handle as RawHandle - } -} - -impl FromRawHandle for CompletionPort { - unsafe fn from_raw_handle(handle: RawHandle) -> Self { - Self { - handle: handle as HANDLE, - } - } -} - -impl IntoRawHandle for CompletionPort { - fn into_raw_handle(self) -> RawHandle { - self.handle as RawHandle - } -} - -#[inline] -fn duration_millis(dur: Option) -> u32 { - if let Some(dur) = dur { - // `Duration::as_millis` truncates, so round up. This avoids - // turning sub-millisecond timeouts into a zero timeout, unless - // the caller explicitly requests that by specifying a zero - // timeout. - let dur_ms = dur - .checked_add(Duration::from_nanos(999_999)) - .unwrap_or(dur) - .as_millis(); - std::cmp::min(dur_ms, u32::MAX as u128) as u32 - } else { - u32::MAX - } -} diff --git a/monoio/src/driver/legacy/iocp/event.rs b/monoio/src/driver/legacy/iocp/event.rs deleted file mode 100644 index 0f962ff8..00000000 --- a/monoio/src/driver/legacy/iocp/event.rs +++ /dev/null @@ -1,120 +0,0 @@ -use mio::Token; -use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY; - -use super::afd; - -#[derive(Clone)] -pub struct Event { - pub flags: u32, - pub data: u64, -} - -impl Event { - pub fn new(token: Token) -> Event { - Event { - flags: 0, - data: usize::from(token) as u64, - } - } - - pub fn token(&self) -> Token { - Token(self.data as usize) - } - - pub fn set_readable(&mut self) { - self.flags |= afd::POLL_RECEIVE - } - - pub fn set_writable(&mut self) { - self.flags |= afd::POLL_SEND; - } - - pub fn from_entry(status: &OVERLAPPED_ENTRY) -> Event { - Event { - flags: status.dwNumberOfBytesTransferred, - data: status.lpCompletionKey as u64, - } - } - - pub fn to_entry(&self) -> OVERLAPPED_ENTRY { - OVERLAPPED_ENTRY { - dwNumberOfBytesTransferred: self.flags, - lpCompletionKey: self.data as usize, - lpOverlapped: std::ptr::null_mut(), - Internal: 0, - } - } - - pub fn is_readable(&self) -> bool { - self.flags & READABLE_FLAGS != 0 - } - - pub fn is_writable(&self) -> bool { - self.flags & WRITABLE_FLAGS != 0 - } - - pub fn is_error(&self) -> bool { - self.flags & ERROR_FLAGS != 0 - } - - pub fn is_read_closed(&self) -> bool { - self.flags & READ_CLOSED_FLAGS != 0 - } - - pub fn is_write_closed(&self) -> bool { - self.flags & WRITE_CLOSED_FLAGS != 0 - } - - pub fn is_priority(&self) -> bool { - self.flags & afd::POLL_RECEIVE_EXPEDITED != 0 - } -} - -pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE - | afd::POLL_DISCONNECT - | afd::POLL_ACCEPT - | afd::POLL_ABORT - | afd::POLL_CONNECT_FAIL; -pub(crate) const WRITABLE_FLAGS: u32 = afd::POLL_SEND | afd::POLL_ABORT | afd::POLL_CONNECT_FAIL; -pub(crate) const ERROR_FLAGS: u32 = afd::POLL_CONNECT_FAIL; -pub(crate) const READ_CLOSED_FLAGS: u32 = - afd::POLL_DISCONNECT | afd::POLL_ABORT | afd::POLL_CONNECT_FAIL; -pub(crate) const WRITE_CLOSED_FLAGS: u32 = afd::POLL_ABORT | afd::POLL_CONNECT_FAIL; - -pub struct Events { - pub statuses: Box<[OVERLAPPED_ENTRY]>, - - pub events: Vec, -} - -impl Events { - pub fn with_capacity(cap: usize) -> Events { - Events { - statuses: unsafe { vec![std::mem::zeroed(); cap].into_boxed_slice() }, - events: Vec::with_capacity(cap), - } - } - - pub fn is_empty(&self) -> bool { - self.events.is_empty() - } - - pub fn capacity(&self) -> usize { - self.events.capacity() - } - - pub fn len(&self) -> usize { - self.events.len() - } - - pub fn get(&self, idx: usize) -> Option<&Event> { - self.events.get(idx) - } - - pub fn clear(&mut self) { - self.events.clear(); - for status in self.statuses.iter_mut() { - *status = unsafe { std::mem::zeroed() }; - } - } -} diff --git a/monoio/src/driver/legacy/iocp/mod.rs b/monoio/src/driver/legacy/iocp/mod.rs deleted file mode 100644 index 59dc8b37..00000000 --- a/monoio/src/driver/legacy/iocp/mod.rs +++ /dev/null @@ -1,312 +0,0 @@ -mod afd; -mod core; -mod event; -mod state; -mod waker; - -pub use core::*; -use std::{ - collections::VecDeque, - os::windows::prelude::RawSocket, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, - time::Duration, -}; - -pub use afd::*; -pub use event::*; -pub use state::*; -pub use waker::*; -use windows_sys::Win32::{ - Foundation::WAIT_TIMEOUT, - System::IO::{OVERLAPPED, OVERLAPPED_ENTRY}, -}; - -pub struct Poller { - is_polling: AtomicBool, - cp: CompletionPort, - update_queue: Mutex>>>>, - afd: Mutex>>, -} - -impl Poller { - pub fn new() -> std::io::Result { - Ok(Self { - is_polling: AtomicBool::new(false), - cp: CompletionPort::new(0)?, - update_queue: Mutex::new(VecDeque::new()), - afd: Mutex::new(Vec::new()), - }) - } - - pub fn poll(&self, events: &mut Events, timeout: Option) -> std::io::Result<()> { - events.clear(); - - if timeout.is_none() { - loop { - let len = self.poll_inner(&mut events.statuses, &mut events.events, None)?; - if len == 0 { - continue; - } - break Ok(()); - } - } else { - self.poll_inner(&mut events.statuses, &mut events.events, timeout)?; - Ok(()) - } - } - - pub fn poll_inner( - &self, - entries: &mut [OVERLAPPED_ENTRY], - events: &mut Vec, - timeout: Option, - ) -> std::io::Result { - self.is_polling.swap(true, Ordering::AcqRel); - - unsafe { self.update_sockets_events() }?; - - let result = self.cp.get_many(entries, timeout); - - self.is_polling.store(false, Ordering::Relaxed); - - match result { - Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }), - Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0), - Err(e) => Err(e), - } - } - - unsafe fn update_sockets_events(&self) -> std::io::Result<()> { - let mut queue = self.update_queue.lock().unwrap(); - for sock in queue.iter_mut() { - let mut sock_internal = sock.lock().unwrap(); - if !sock_internal.delete_pending { - sock_internal.update(sock)?; - } - } - - queue.retain(|sock| sock.lock().unwrap().error.is_some()); - - let mut afd = self.afd.lock().unwrap(); - afd.retain(|g| Arc::strong_count(g) > 1); - Ok(()) - } - - unsafe fn feed_events(&self, events: &mut Vec, entries: &[OVERLAPPED_ENTRY]) -> usize { - let mut n = 0; - let mut update_queue = self.update_queue.lock().unwrap(); - for entry in entries.iter() { - if entry.lpOverlapped.is_null() { - events.push(Event::from_entry(entry)); - n += 1; - continue; - } - - let sock_state = from_overlapped(entry.lpOverlapped); - let mut sock_guard = sock_state.lock().unwrap(); - if let Some(e) = sock_guard.feed_event() { - events.push(e); - n += 1; - } - - if !sock_guard.delete_pending { - update_queue.push_back(sock_state.clone()); - } - } - let mut afd = self.afd.lock().unwrap(); - afd.retain(|sock| Arc::strong_count(sock) > 1); - n - } - - pub fn register( - &self, - state: &mut SocketState, - token: mio::Token, - interests: mio::Interest, - ) -> std::io::Result<()> { - if state.inner.is_none() { - let flags = interests_to_afd_flags(interests); - - let inner = { - let sock = self._alloc_sock_for_rawsocket(state.socket)?; - let event = Event { - flags, - data: token.0 as u64, - }; - sock.lock().unwrap().set_event(event); - sock - }; - - self.queue_state(inner.clone()); - unsafe { self.update_sockets_events_if_polling()? }; - state.inner = Some(inner); - state.token = token; - state.interest = interests; - - Ok(()) - } else { - Err(std::io::ErrorKind::AlreadyExists.into()) - } - } - - pub fn reregister( - &self, - state: &mut SocketState, - token: mio::Token, - interests: mio::Interest, - ) -> std::io::Result<()> { - if let Some(inner) = state.inner.as_mut() { - { - let event = Event { - flags: interests_to_afd_flags(interests), - data: token.0 as u64, - }; - - inner.lock().unwrap().set_event(event); - } - - state.token = token; - state.interest = interests; - - self.queue_state(inner.clone()); - unsafe { self.update_sockets_events_if_polling() } - } else { - Err(std::io::ErrorKind::NotFound.into()) - } - } - - pub fn deregister(&mut self, state: &mut SocketState) -> std::io::Result<()> { - if let Some(inner) = state.inner.as_mut() { - { - let mut sock_state = inner.lock().unwrap(); - sock_state.mark_delete(); - } - state.inner = None; - Ok(()) - } else { - Err(std::io::ErrorKind::NotFound.into()) - } - } - - /// This function is called by register() and reregister() to start an - /// IOCTL_AFD_POLL operation corresponding to the registered events, but - /// only if necessary. - /// - /// Since it is not possible to modify or synchronously cancel an AFD_POLL - /// operation, and there can be only one active AFD_POLL operation per - /// (socket, completion port) pair at any time, it is expensive to change - /// a socket's event registration after it has been submitted to the kernel. - /// - /// Therefore, if no other threads are polling when interest in a socket - /// event is (re)registered, the socket is added to the 'update queue', but - /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred - /// until just before the GetQueuedCompletionStatusEx() syscall is made. - /// - /// However, when another thread is already blocked on - /// GetQueuedCompletionStatusEx() we tell the kernel about the registered - /// socket event(s) immediately. - unsafe fn update_sockets_events_if_polling(&self) -> std::io::Result<()> { - if self.is_polling.load(Ordering::Acquire) { - self.update_sockets_events() - } else { - Ok(()) - } - } - - fn queue_state(&self, sock_state: Pin>>) { - let mut update_queue = self.update_queue.lock().unwrap(); - update_queue.push_back(sock_state); - } - - fn _alloc_sock_for_rawsocket( - &self, - raw_socket: RawSocket, - ) -> std::io::Result>>> { - const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; - - let mut afd_group = self.afd.lock().unwrap(); - if afd_group.len() == 0 { - self._alloc_afd_group(&mut afd_group)?; - } else { - // + 1 reference in Vec - if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP__MAX_GROUP_SIZE { - self._alloc_afd_group(&mut afd_group)?; - } - } - let afd = match afd_group.last() { - Some(arc) => arc.clone(), - None => unreachable!("Cannot acquire afd"), - }; - - Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?))) - } - - fn _alloc_afd_group(&self, afd_group: &mut Vec>) -> std::io::Result<()> { - let afd = Afd::new(&self.cp)?; - let arc = Arc::new(afd); - afd_group.push(arc); - Ok(()) - } -} - -impl Drop for Poller { - fn drop(&mut self) { - loop { - let count: usize; - let mut statuses: [OVERLAPPED_ENTRY; 1024] = unsafe { std::mem::zeroed() }; - - let result = self - .cp - .get_many(&mut statuses, Some(std::time::Duration::from_millis(0))); - match result { - Ok(events) => { - count = events.iter().len(); - for event in events.iter() { - if event.lpOverlapped.is_null() { - } else { - // drain sock state to release memory of Arc reference - let _ = from_overlapped(event.lpOverlapped); - } - } - } - Err(_) => break, - } - - if count == 0 { - break; - } - } - - let mut afd_group = self.afd.lock().unwrap(); - afd_group.retain(|g| Arc::strong_count(g) > 1); - } -} - -pub fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin>> { - let sock_ptr: *const Mutex = ptr as *const _; - unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) } -} - -pub fn into_overlapped(sock_state: Pin>>) -> *mut std::ffi::c_void { - let overlapped_ptr: *const Mutex = - unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) }; - overlapped_ptr as *mut _ -} - -pub fn interests_to_afd_flags(interests: mio::Interest) -> u32 { - let mut flags = 0; - - if interests.is_readable() { - flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS; - } - - if interests.is_writable() { - flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS; - } - - flags -} diff --git a/monoio/src/driver/legacy/iocp/state.rs b/monoio/src/driver/legacy/iocp/state.rs deleted file mode 100644 index a550eb6e..00000000 --- a/monoio/src/driver/legacy/iocp/state.rs +++ /dev/null @@ -1,291 +0,0 @@ -use core::fmt::Debug; -use std::{ - marker::PhantomPinned, - os::windows::prelude::RawSocket, - pin::Pin, - sync::{Arc, Mutex}, -}; - -use windows_sys::Win32::{ - Foundation::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED}, - Networking::WinSock::{ - WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE, SIO_BSP_HANDLE_POLL, - SIO_BSP_HANDLE_SELECT, SOCKET_ERROR, - }, - System::WindowsProgramming::IO_STATUS_BLOCK, -}; - -use super::{afd, from_overlapped, into_overlapped, Afd, AfdPollInfo, Event}; - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SockPollStatus { - Idle, - Pending, - Cancelled, -} - -#[derive(Debug)] -pub struct SocketState { - pub socket: RawSocket, - pub inner: Option>>>, - pub token: mio::Token, - pub interest: mio::Interest, -} - -impl SocketState { - pub fn new(socket: RawSocket) -> Self { - Self { - socket, - inner: None, - token: mio::Token(0), - interest: mio::Interest::READABLE, - } - } -} - -pub struct SockState { - pub iosb: IO_STATUS_BLOCK, - pub poll_info: AfdPollInfo, - pub afd: Arc, - - pub base_socket: RawSocket, - - pub user_evts: u32, - pub pending_evts: u32, - - pub user_data: u64, - - pub poll_status: SockPollStatus, - pub delete_pending: bool, - - pub error: Option, - - _pinned: PhantomPinned, -} - -impl SockState { - pub fn new(raw_socket: RawSocket, afd: Arc) -> std::io::Result { - Ok(SockState { - iosb: unsafe { std::mem::zeroed() }, - poll_info: unsafe { std::mem::zeroed() }, - afd, - base_socket: get_base_socket(raw_socket)?, - user_evts: 0, - pending_evts: 0, - user_data: 0, - poll_status: SockPollStatus::Idle, - delete_pending: false, - error: None, - _pinned: PhantomPinned, - }) - } - - pub fn update(&mut self, self_arc: &Pin>>) -> std::io::Result<()> { - assert!(!self.delete_pending); - - // make sure to reset previous error before a new update - self.error = None; - - if let SockPollStatus::Pending = self.poll_status { - if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 { - // All the events the user is interested in are already being monitored by - // the pending poll operation. It might spuriously complete because of an - // event that we're no longer interested in; when that happens we'll submit - // a new poll operation with the updated event mask. - } else { - // A poll operation is already pending, but it's not monitoring for all the - // events that the user is interested in. Therefore, cancel the pending - // poll operation; when we receive it's completion package, a new poll - // operation will be submitted with the correct event mask. - if let Err(e) = self.cancel() { - self.error = e.raw_os_error(); - return Err(e); - } - return Ok(()); - } - } else if let SockPollStatus::Cancelled = self.poll_status { - // The poll operation has already been cancelled, we're still waiting for - // it to return. For now, there's nothing that needs to be done. - } else if let SockPollStatus::Idle = self.poll_status { - // No poll operation is pending; start one. - self.poll_info.exclusive = 0; - self.poll_info.number_of_handles = 1; - self.poll_info.timeout = i64::MAX; - self.poll_info.handles[0].handle = self.base_socket as HANDLE; - self.poll_info.handles[0].status = 0; - self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE; - - // Increase the ref count as the memory will be used by the kernel. - let overlapped_ptr = into_overlapped(self_arc.clone()); - - let result = unsafe { - self.afd - .poll(&mut self.poll_info, &mut self.iosb, overlapped_ptr) - }; - if let Err(e) = result { - let code = e.raw_os_error().unwrap(); - if code == ERROR_IO_PENDING as i32 { - // Overlapped poll operation in progress; this is expected. - } else { - // Since the operation failed it means the kernel won't be - // using the memory any more. - drop(from_overlapped(overlapped_ptr as *mut _)); - if code == ERROR_INVALID_HANDLE as i32 { - // Socket closed; it'll be dropped. - self.mark_delete(); - return Ok(()); - } else { - self.error = e.raw_os_error(); - return Err(e); - } - } - } - - self.poll_status = SockPollStatus::Pending; - self.pending_evts = self.user_evts; - } else { - unreachable!("Invalid poll status during update") - } - - Ok(()) - } - - pub fn feed_event(&mut self) -> Option { - self.poll_status = SockPollStatus::Idle; - self.pending_evts = 0; - - let mut afd_events = 0; - // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is - // unsafe to use a pointer of IO_STATUS_BLOCK. - unsafe { - if self.delete_pending { - return None; - } else if self.iosb.Anonymous.Status == STATUS_CANCELLED { - // The poll request was cancelled by CancelIoEx. - } else if self.iosb.Anonymous.Status < 0 { - // The overlapped request itself failed in an unexpected way. - afd_events = afd::POLL_CONNECT_FAIL; - } else if self.poll_info.number_of_handles < 1 { - // This poll operation succeeded but didn't report any socket events. - } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 { - // The poll operation reported that the socket was closed. - self.mark_delete(); - return None; - } else { - afd_events = self.poll_info.handles[0].events; - } - } - - afd_events &= self.user_evts; - - if afd_events == 0 { - return None; - } - - self.user_evts &= !afd_events; - - Some(Event { - data: self.user_data, - flags: afd_events, - }) - } - - pub fn mark_delete(&mut self) { - if !self.delete_pending { - if let SockPollStatus::Pending = self.poll_status { - drop(self.cancel()); - } - - self.delete_pending = true; - } - } - - pub fn set_event(&mut self, ev: Event) -> bool { - // afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested - // by the caller. - let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT; - - self.user_evts = events; - self.user_data = ev.data; - - (events & !self.pending_evts) != 0 - } - - pub fn cancel(&mut self) -> std::io::Result<()> { - match self.poll_status { - SockPollStatus::Pending => {} - _ => unreachable!("Invalid poll status during cancel"), - }; - unsafe { - self.afd.cancel(&mut self.iosb)?; - } - self.poll_status = SockPollStatus::Cancelled; - self.pending_evts = 0; - Ok(()) - } -} - -impl Debug for SockState { - #[allow(unused_variables)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } -} - -impl Drop for SockState { - fn drop(&mut self) { - self.mark_delete(); - } -} - -fn get_base_socket(raw_socket: RawSocket) -> std::io::Result { - let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE); - if let Ok(base_socket) = res { - return Ok(base_socket); - } - - // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore - // it should not fail as long as `raw_socket` is a valid socket. See - // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls. - // However, at least one known LSP deliberately breaks it, so we try - // some alternative IOCTLs, starting with the most appropriate one. - for &ioctl in &[SIO_BSP_HANDLE_SELECT, SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE] { - if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) { - // Since we know now that we're dealing with an LSP (otherwise - // SIO_BASE_HANDLE would't have failed), only return any result - // when it is different from the original `raw_socket`. - if base_socket != raw_socket { - return Ok(base_socket); - } - } - } - - // If the alternative IOCTLs also failed, return the original error. - let os_error = res.unwrap_err(); - let err = std::io::Error::from_raw_os_error(os_error); - Err(err) -} - -fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result { - let mut base_socket: RawSocket = 0; - let mut bytes: u32 = 0; - let result = unsafe { - WSAIoctl( - raw_socket as usize, - ioctl, - std::ptr::null_mut(), - 0, - &mut base_socket as *mut _ as *mut std::ffi::c_void, - std::mem::size_of::() as u32, - &mut bytes, - std::ptr::null_mut(), - None, - ) - }; - - if result != SOCKET_ERROR { - Ok(base_socket) - } else { - Err(unsafe { WSAGetLastError() }) - } -} diff --git a/monoio/src/driver/legacy/iocp/waker.rs b/monoio/src/driver/legacy/iocp/waker.rs deleted file mode 100644 index d5aa9a99..00000000 --- a/monoio/src/driver/legacy/iocp/waker.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::{io, sync::Arc}; - -use super::{CompletionPort, Event, Poller}; - -#[derive(Debug)] -pub struct Waker { - token: mio::Token, - port: Arc, -} - -impl Waker { - #[allow(unreachable_code, unused_variables)] - pub fn new(poller: &Poller, token: mio::Token) -> io::Result { - Ok(Waker { - token, - // port: poller.cp.clone(), - port: unimplemented!(), - }) - } - - pub fn wake(&self) -> io::Result<()> { - let mut ev = Event::new(self.token); - ev.set_readable(); - self.port.post(ev.to_entry()) - } -} diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 9d0f796e..dab4a35c 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -8,6 +8,15 @@ use std::{ time::Duration, }; +#[cfg(unix)] +use mio::{event::Source, Events}; +use mio::{Interest, Token}; +#[cfg(windows)] +use { + polling::{Event, PollMode, Poller}, + std::os::windows::io::RawSocket, +}; + use super::{ op::{CompletionMeta, Op, OpAble}, ready::{self, Ready}, @@ -16,10 +25,6 @@ use super::{ }; use crate::utils::slab::Slab; -#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)] -#[cfg(windows)] -pub(super) mod iocp; - #[cfg(feature = "sync")] mod waker; #[cfg(feature = "sync")] @@ -28,13 +33,13 @@ pub(crate) use waker::UnparkHandle; pub(crate) struct LegacyInner { pub(crate) io_dispatch: Slab, #[cfg(unix)] - events: mio::Events, + events: Events, #[cfg(unix)] poll: mio::Poll, #[cfg(windows)] - events: iocp::Events, + events: Vec, #[cfg(windows)] - poll: iocp::Poller, + poll: std::sync::Arc, #[cfg(feature = "sync")] shared_waker: std::sync::Arc, @@ -55,7 +60,7 @@ pub struct LegacyDriver { } #[cfg(feature = "sync")] -const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); +const TOKEN_WAKEUP: Token = Token(1 << 31); #[allow(dead_code)] impl LegacyDriver { @@ -69,18 +74,13 @@ impl LegacyDriver { #[cfg(unix)] let poll = mio::Poll::new()?; #[cfg(windows)] - let poll = iocp::Poller::new()?; + let poll = std::sync::Arc::new(Poller::new()?); #[cfg(all(unix, feature = "sync"))] - let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new( - poll.registry(), - TOKEN_WAKEUP, - )?)); + let shared_waker = + std::sync::Arc::new(waker::EventWaker::new(poll.registry(), TOKEN_WAKEUP)?); #[cfg(all(windows, feature = "sync"))] - let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new( - &poll, - TOKEN_WAKEUP, - )?)); + let shared_waker = std::sync::Arc::new(waker::EventWaker::new(poll.clone(), TOKEN_WAKEUP)?); #[cfg(feature = "sync")] let (waker_sender, waker_receiver) = flume::unbounded::(); #[cfg(feature = "sync")] @@ -89,11 +89,11 @@ impl LegacyDriver { let inner = LegacyInner { io_dispatch: Slab::new(), #[cfg(unix)] - events: mio::Events::with_capacity(entries as usize), + events: Events::with_capacity(entries as usize), #[cfg(unix)] poll, #[cfg(windows)] - events: iocp::Events::with_capacity(entries as usize), + events: Vec::with_capacity(entries as usize), #[cfg(windows)] poll, #[cfg(feature = "sync")] @@ -152,25 +152,29 @@ impl LegacyDriver { // here we borrow 2 mut self, but its safe. let events = unsafe { &mut (*self.inner.get()).events }; - match inner.poll.poll(events, timeout) { + #[cfg(unix)] + let result = inner.poll.poll(events, timeout); + #[cfg(windows)] + let result = inner.poll.wait(events, timeout); + match result { Ok(_) => {} Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } - #[cfg(unix)] let iter = events.iter(); - #[cfg(windows)] - let iter = events.events.iter(); for event in iter { + #[cfg(unix)] let token = event.token(); + #[cfg(windows)] + let token = Token(event.key); #[cfg(feature = "sync")] if token != TOKEN_WAKEUP { - inner.dispatch(token, Ready::from_mio(event)); + inner.dispatch(token, Ready::from(event)); } #[cfg(not(feature = "sync"))] - inner.dispatch(token, Ready::from_mio(event)); + inner.dispatch(token, Ready::from(event)); } Ok(()) } @@ -178,14 +182,26 @@ impl LegacyDriver { #[cfg(windows)] pub(crate) fn register( this: &Rc>, - state: &mut iocp::SocketState, - interest: mio::Interest, + socket: RawSocket, + interest: Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; let io = ScheduledIo::default(); let token = inner.io_dispatch.insert(io); + let event = if interest.is_readable() && interest.is_writable() { + Event::all(token) + } else if interest.is_readable() { + Event::readable(token) + } else { + Event::writable(token) + }; + let mode = if inner.poll.supports_edge() { + PollMode::Edge + } else { + PollMode::Level + }; - match inner.poll.register(state, mio::Token(token), interest) { + match inner.poll.modify_with_mode(socket, event, mode) { Ok(_) => Ok(token), Err(e) => { inner.io_dispatch.remove(token); @@ -198,12 +214,12 @@ impl LegacyDriver { pub(crate) fn deregister( this: &Rc>, token: usize, - state: &mut iocp::SocketState, + socket: RawSocket, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; // try to deregister fd first, on success we will remove it from slab. - match inner.poll.deregister(state) { + match inner.poll.delete(socket) { Ok(_) => { inner.io_dispatch.remove(token); Ok(()) @@ -215,14 +231,14 @@ impl LegacyDriver { #[cfg(unix)] pub(crate) fn register( this: &Rc>, - source: &mut impl mio::event::Source, - interest: mio::Interest, + source: &mut impl Source, + interest: Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; let token = inner.io_dispatch.insert(ScheduledIo::new()); let registry = inner.poll.registry(); - match registry.register(source, mio::Token(token), interest) { + match registry.register(source, Token(token), interest) { Ok(_) => Ok(token), Err(e) => { inner.io_dispatch.remove(token); @@ -235,7 +251,7 @@ impl LegacyDriver { pub(crate) fn deregister( this: &Rc>, token: usize, - source: &mut impl mio::event::Source, + source: &mut impl Source, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; @@ -251,7 +267,7 @@ impl LegacyDriver { } impl LegacyInner { - fn dispatch(&mut self, token: mio::Token, ready: Ready) { + fn dispatch(&mut self, token: Token, ready: Ready) { let mut sio = match self.io_dispatch.get(token.0) { Some(io) => io, None => { @@ -324,7 +340,7 @@ impl LegacyInner { ready::Direction::Read => Ready::READ_CANCELED, ready::Direction::Write => Ready::WRITE_CANCELED, }; - inner.dispatch(mio::Token(index), ready); + inner.dispatch(Token(index), ready); } pub(crate) fn submit_with_data( diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 40290e96..d3a374ea 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -1,30 +1,38 @@ +use mio::Token; + use crate::driver::unpark::Unpark; pub(crate) struct EventWaker { // raw waker - #[cfg(windows)] - waker: super::iocp::Waker, #[cfg(unix)] waker: mio::Waker, + #[cfg(windows)] + poll: std::sync::Arc, + #[cfg(windows)] + token: Token, // Atomic awake status pub(crate) awake: std::sync::atomic::AtomicBool, } impl EventWaker { #[cfg(unix)] - pub(crate) fn new(waker: mio::Waker) -> Self { - Self { - waker, + pub(crate) fn new(registry: &mio::Registry, token: Token) -> std::io::Result { + Ok(Self { + waker: mio::Waker::new(registry, token)?, awake: std::sync::atomic::AtomicBool::new(true), - } + }) } #[cfg(windows)] - pub(crate) fn new(waker: super::iocp::Waker) -> Self { - Self { - waker, + pub(crate) fn new( + poll: std::sync::Arc, + token: Token, + ) -> std::io::Result { + Ok(Self { + poll, + token, awake: std::sync::atomic::AtomicBool::new(true), - } + }) } pub(crate) fn wake(&self) -> std::io::Result<()> { @@ -32,7 +40,15 @@ impl EventWaker { if self.awake.load(std::sync::atomic::Ordering::Acquire) { return Ok(()); } - self.waker.wake() + #[cfg(unix)] + let r = self.waker.wake(); + #[cfg(windows)] + use polling::os::iocp::PollerIocpExt; + #[cfg(windows)] + let r = self.poll.post(polling::os::iocp::CompletionPacket::new( + polling::Event::readable(self.token.0), + )); + r } } diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index a3e753df..b7701a2a 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -26,7 +26,7 @@ use std::{ }; #[allow(unreachable_pub)] -#[cfg(all(feature = "legacy", unix))] +#[cfg(feature = "legacy")] pub use self::legacy::LegacyDriver; #[cfg(feature = "legacy")] use self::legacy::LegacyInner; @@ -108,10 +108,7 @@ impl Inner { not(all(target_os = "linux", feature = "iouring")) ))] _ => { - #[cfg(unix)] util::feature_panic(); - #[cfg(windows)] - unimplemented!(); } } } @@ -124,8 +121,6 @@ impl Inner { cx: &mut Context<'_>, ) -> Poll { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::poll_op(this, index, cx), #[cfg(feature = "legacy")] @@ -147,8 +142,6 @@ impl Inner { cx: &mut Context<'_>, ) -> Poll { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::poll_legacy_op(this, data, cx), #[cfg(feature = "legacy")] @@ -166,8 +159,6 @@ impl Inner { #[allow(unused)] fn drop_op(&self, index: usize, data: &mut Option) { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::drop_op(this, index, data), #[cfg(feature = "legacy")] @@ -185,8 +176,6 @@ impl Inner { #[allow(unused)] pub(super) unsafe fn cancel_op(&self, op_canceller: &op::OpCanceller) { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::cancel_op(this, op_canceller.index), #[cfg(feature = "legacy")] diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index 07e5f25e..866f19a2 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -1,13 +1,32 @@ -use std::{io, net::SocketAddr}; +#[cfg(all(windows, feature = "unstable"))] +use std::sync::LazyLock; +use std::{ + io, + mem::{transmute, MaybeUninit}, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, +}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(all(windows, not(feature = "unstable")))] +use once_cell::sync::Lazy as LazyLock; +#[cfg(windows)] +use windows_sys::{ + core::GUID, + Win32::{ + Networking::WinSock::{ + WSAGetLastError, WSAIoctl, AF_INET, AF_INET6, INVALID_SOCKET, LPFN_WSARECVMSG, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR, + SOCKADDR_IN as sockaddr_in, SOCKADDR_IN6 as sockaddr_in6, + SOCKADDR_STORAGE as sockaddr_storage, SOCKET, SOCKET_ERROR, WSAID_WSARECVMSG, WSAMSG, + }, + System::IO::OVERLAPPED, + }, +}; #[cfg(unix)] use { crate::net::unix::SocketAddr as UnixSocketAddr, - libc::{socklen_t, AF_INET, AF_INET6}, - std::mem::{transmute, MaybeUninit}, - std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, + libc::{sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, AF_INET, AF_INET6}, }; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { @@ -20,7 +39,10 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -use crate::{buf::IoBufMut, BufResult}; +use crate::{ + buf::{IoBufMut, IoVecBufMut, IoVecMeta, MsgMeta}, + BufResult, +}; pub(crate) struct Recv { /// Holds a strong ref to the FD, preventing the file from being closed @@ -112,31 +134,31 @@ pub(crate) struct RecvMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, - #[cfg(unix)] - pub(crate) info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )>, + /// For multiple message recv in the future + pub(crate) info: Box<(MaybeUninit, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.write_ptr() as *mut _, - iov_len: buf.bytes_total(), - }]; - let mut info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; - info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + let mut info: Box<(MaybeUninit, IoVecMeta, MsgMeta)> = + Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { + std::mem::zeroed() + })); + + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + info.2.name = &mut info.0 as *mut _ as *mut SOCKADDR; + info.2.namelen = std::mem::size_of::() as _; + } Op::submit_with(RecvMsg { fd, buf, info }) } @@ -150,27 +172,32 @@ impl Op> { let storage = unsafe { complete.data.info.0.assume_init() }; let addr = unsafe { - match storage.ss_family as libc::c_int { + match storage.ss_family as _ { AF_INET => { // Safety: if the ss_family field is AF_INET then storage must be a // sockaddr_in. - let addr: &libc::sockaddr_in = transmute(&storage); + let addr: &sockaddr_in = transmute(&storage); + #[cfg(unix)] let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + #[cfg(windows)] + let ip = Ipv4Addr::from(addr.sin_addr.S_un.S_addr.to_ne_bytes()); let port = u16::from_be(addr.sin_port); SocketAddr::V4(SocketAddrV4::new(ip, port)) } AF_INET6 => { // Safety: if the ss_family field is AF_INET6 then storage must be a // sockaddr_in6. - let addr: &libc::sockaddr_in6 = transmute(&storage); + let addr: &sockaddr_in6 = transmute(&storage); + #[cfg(unix)] let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + #[cfg(windows)] + let ip = Ipv6Addr::from(addr.sin6_addr.u.Byte); let port = u16::from_be(addr.sin6_port); - SocketAddr::V6(SocketAddrV6::new( - ip, - port, - addr.sin6_flowinfo, - addr.sin6_scope_id, - )) + #[cfg(unix)] + let scope_id = addr.sin6_scope_id; + #[cfg(windows)] + let scope_id = addr.Anonymous.sin6_scope_id; + SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)) } _ => { unreachable!() @@ -179,9 +206,7 @@ impl Op> { }; // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } + unsafe { buf.set_init(n) }; (n, addr) }); @@ -189,22 +214,42 @@ impl Op> { } } +/// see https://github.com/microsoft/windows-rs/issues/2530 #[cfg(windows)] -impl Op> { - #[allow(unused_mut, unused_variables)] - pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - unimplemented!() - } - - pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { - unimplemented!() +static WSA_RECV_MSG: LazyLock< + unsafe extern "system" fn( + SOCKET, + *mut WSAMSG, + *mut u32, + *mut OVERLAPPED, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, + ) -> i32, +> = LazyLock::new(|| unsafe { + let mut wsa_recv_msg: LPFN_WSARECVMSG = None; + let mut dw_bytes = 0; + let r = WSAIoctl( + INVALID_SOCKET, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &WSAID_WSARECVMSG as *const _ as *const std::ffi::c_void, + std::mem::size_of:: as usize as u32, + &mut wsa_recv_msg as *mut _ as *mut std::ffi::c_void, + std::mem::size_of::() as _, + &mut dw_bytes, + std::ptr::null_mut(), + None, + ); + if r == SOCKET_ERROR || wsa_recv_msg.is_none() { + panic!("{}", io::Error::from_raw_os_error(WSAGetLastError())) + } else { + assert_eq!(dw_bytes, std::mem::size_of::() as _); + wsa_recv_msg.unwrap() } -} +}); impl OpAble for RecvMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { - opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build() + opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut *self.info.2).build() } #[cfg(any(feature = "legacy", feature = "poll-io"))] @@ -216,13 +261,27 @@ impl OpAble for RecvMsg { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) + syscall_u32!(recvmsg(fd, &mut *self.info.2, 0)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut recved = 0; + let r = unsafe { + (LazyLock::force(&WSA_RECV_MSG))( + fd as _, + &mut *self.info.2, + &mut recved, + std::ptr::null_mut(), + None, + ) + }; + if r == SOCKET_ERROR { + unsafe { Err(io::Error::from_raw_os_error(WSAGetLastError())) } + } else { + Ok(recved) + } } } @@ -235,30 +294,22 @@ pub(crate) struct RecvMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, - pub(crate) info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )>, + /// For multiple message recv in the future + pub(crate) info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] impl Op> { pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.write_ptr() as *mut _, - iov_len: buf.bytes_total(), - }]; - let mut info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; + let mut info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)> = + Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { + std::mem::zeroed() + })); + + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; Op::submit_with(RecvMsgUnix { fd, buf, info }) } diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index def8ba34..f8e5c285 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -2,12 +2,12 @@ use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(unix)] -use {crate::net::unix::SocketAddr as UnixSocketAddr, socket2::SockAddr}; +use socket2::SockAddr; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::syscall, std::os::windows::io::AsRawSocket, - windows_sys::Win32::Networking::WinSock::send, + crate::syscall, + std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{send, WSASendMsg, SOCKET_ERROR}, }; #[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; @@ -15,7 +15,12 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -use crate::{buf::IoBuf, BufResult}; +#[cfg(unix)] +use crate::net::unix::SocketAddr as UnixSocketAddr; +use crate::{ + buf::{IoBuf, IoVecBufMut, IoVecMeta, MsgMeta}, + BufResult, +}; pub(crate) struct Send { /// Holds a strong ref to the FD, preventing the file from being closed @@ -123,37 +128,50 @@ pub(crate) struct SendMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, - #[cfg(unix)] - pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, + /// For multiple message send in the future + pub(crate) info: Box<(Option, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn send_msg( fd: SharedFd, buf: T, socket_addr: Option, ) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.read_ptr() as *const _ as *mut _, - iov_len: buf.bytes_init(), - }]; - let mut info: Box<(Option, [libc::iovec; 1], libc::msghdr)> = - Box::new((socket_addr.map(Into::into), iovec, unsafe { - std::mem::zeroed() - })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; - - match info.0.as_ref() { - Some(socket_addr) => { - info.2.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - info.2.msg_namelen = socket_addr.len(); + let mut info: Box<(Option, IoVecMeta, MsgMeta)> = Box::new(( + socket_addr.map(Into::into), + IoVecMeta::from(&buf), + unsafe { std::mem::zeroed() }, + )); + + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + match info.0.as_ref() { + Some(socket_addr) => { + info.2.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + info.2.msg_namelen = socket_addr.len(); + } + None => { + info.2.msg_name = std::ptr::null_mut(); + info.2.msg_namelen = 0; + } } - None => { - info.2.msg_name = std::ptr::null_mut(); - info.2.msg_namelen = 0; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + match info.0.as_ref() { + Some(socket_addr) => { + info.2.name = socket_addr.as_ptr() as *mut _; + info.2.namelen = socket_addr.len(); + } + None => { + info.2.name = std::ptr::null_mut(); + info.2.namelen = 0; + } } } @@ -168,28 +186,12 @@ impl Op> { } } -#[cfg(windows)] -impl Op> { - #[allow(unused_variables)] - pub(crate) fn send_msg( - fd: SharedFd, - buf: T, - socket_addr: Option, - ) -> io::Result { - unimplemented!() - } - - pub(crate) async fn wait(self) -> BufResult { - unimplemented!() - } -} - impl OpAble for SendMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { #[allow(deprecated)] const FLAGS: u32 = libc::MSG_NOSIGNAL as u32; - opcode::SendMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _) + opcode::SendMsg::new(types::Fd(self.fd.raw_fd()), &*self.info.2) .flags(FLAGS) .build() } @@ -210,13 +212,28 @@ impl OpAble for SendMsg { #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) + syscall_u32!(sendmsg(fd, &*self.info.2, FLAGS)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut nsent = 0; + let ret = unsafe { + WSASendMsg( + fd as _, + &*self.info.2, + 0, + &mut nsent, + std::ptr::null_mut(), + None, + ) + }; + if ret == SOCKET_ERROR { + Err(io::Error::last_os_error()) + } else { + Ok(nsent) + } } } @@ -229,7 +246,8 @@ pub(crate) struct SendMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, - pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, + /// For multiple message send in the future + pub(crate) info: Box<(Option, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] @@ -239,17 +257,14 @@ impl Op> { buf: T, socket_addr: Option, ) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.read_ptr() as *const _ as *mut _, - iov_len: buf.bytes_init(), - }]; - let mut info: Box<(Option, [libc::iovec; 1], libc::msghdr)> = - Box::new((socket_addr.map(Into::into), iovec, unsafe { - std::mem::zeroed() - })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; + let mut info: Box<(Option, IoVecMeta, libc::msghdr)> = Box::new(( + socket_addr.map(Into::into), + IoVecMeta::from(&buf), + unsafe { std::mem::zeroed() }, + )); + + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; match info.0.as_ref() { Some(socket_addr) => { diff --git a/monoio/src/driver/poll.rs b/monoio/src/driver/poll.rs index 960503c2..95c0a548 100644 --- a/monoio/src/driver/poll.rs +++ b/monoio/src/driver/poll.rs @@ -1,4 +1,4 @@ -use std::{io, os::fd::AsRawFd, task::Context, time::Duration}; +use std::{io, task::Context, time::Duration}; use super::{ready::Direction, scheduled_io::ScheduledIo}; use crate::{driver::op::CompletionMeta, utils::slab::Slab}; @@ -33,7 +33,7 @@ impl Poll { if let Some(mut sio) = self.io_dispatch.get(token.0) { let ref_mut = sio.as_mut(); - let ready = super::ready::Ready::from_mio(event); + let ready = super::ready::Ready::from(event); ref_mut.set_readiness(|curr| curr | ready); ref_mut.wake(ready); } @@ -100,7 +100,8 @@ impl Poll { } } -impl AsRawFd for Poll { +#[cfg(unix)] +impl std::os::fd::AsRawFd for Poll { #[inline] fn as_raw_fd(&self) -> std::os::fd::RawFd { self.poll.as_raw_fd() diff --git a/monoio/src/driver/ready.rs b/monoio/src/driver/ready.rs index 6262359a..d415e1f5 100644 --- a/monoio/src/driver/ready.rs +++ b/monoio/src/driver/ready.rs @@ -45,64 +45,6 @@ impl Ready { pub(crate) const READ_ALL: Ready = Ready(READABLE | READ_CLOSED | READ_CANCELED); pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); - #[cfg(windows)] - pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready { - let mut ready = Ready::EMPTY; - - if event.is_readable() { - ready |= Ready::READABLE; - } - - if event.is_writable() { - ready |= Ready::WRITABLE; - } - - if event.is_read_closed() { - ready |= Ready::READ_CLOSED; - } - - if event.is_write_closed() { - ready |= Ready::WRITE_CLOSED; - } - - ready - } - - #[cfg(unix)] - // Must remain crate-private to avoid adding a public dependency on Mio. - pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { - let mut ready = Ready::EMPTY; - - #[cfg(all(target_os = "freebsd", feature = "net"))] - { - if event.is_aio() { - ready |= Ready::READABLE; - } - - if event.is_lio() { - ready |= Ready::READABLE; - } - } - - if event.is_readable() { - ready |= Ready::READABLE; - } - - if event.is_writable() { - ready |= Ready::WRITABLE; - } - - if event.is_read_closed() { - ready |= Ready::READ_CLOSED; - } - - if event.is_write_closed() { - ready |= Ready::WRITE_CLOSED; - } - - ready - } - /// Returns true if `Ready` is the empty set. pub(crate) fn is_empty(self) -> bool { self == Ready::EMPTY @@ -227,6 +169,59 @@ impl fmt::Debug for Ready { } } +// Must remain crate-private to avoid adding a public dependency on Mio. +impl From<&mio::event::Event> for Ready { + fn from(event: &mio::event::Event) -> Self { + let mut ready = Ready::EMPTY; + + #[cfg(all(target_os = "freebsd", feature = "net"))] + { + if event.is_aio() { + ready |= Ready::READABLE; + } + + if event.is_lio() { + ready |= Ready::READABLE; + } + } + + if event.is_readable() { + ready |= Ready::READABLE; + } + + if event.is_writable() { + ready |= Ready::WRITABLE; + } + + if event.is_read_closed() { + ready |= Ready::READ_CLOSED; + } + + if event.is_write_closed() { + ready |= Ready::WRITE_CLOSED; + } + + ready + } +} + +#[cfg(windows)] +impl From<&polling::Event> for Ready { + fn from(event: &polling::Event) -> Self { + let mut ready = Ready::EMPTY; + + if event.readable { + ready |= Ready::READABLE; + } + + if event.writable { + ready |= Ready::WRITABLE; + } + + ready + } +} + #[derive(Debug, Eq, PartialEq, Clone, Copy, Hash)] pub(crate) enum Direction { Read, diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index 499ed3d2..5feb27bc 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -2,12 +2,10 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; #[cfg(windows)] use std::os::windows::io::{ - AsRawHandle, AsRawSocket, FromRawSocket, OwnedSocket, RawHandle, RawSocket, + AsRawHandle, AsRawSocket, FromRawSocket, OwnedSocket, RawHandle, RawSocket as RawFd, RawSocket, }; use std::{cell::UnsafeCell, io, rc::Rc}; -#[cfg(windows)] -use super::legacy::iocp::SocketState as RawFd; use super::CURRENT; // Tracks in-flight operations on a file descriptor. Ensures all in-flight @@ -245,12 +243,10 @@ impl SharedFd { pub(crate) fn new(fd: RawSocket) -> io::Result { const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); - let mut fd = RawFd::new(fd); - let state = { let reg = CURRENT.with(|inner| match inner { super::Inner::Legacy(inner) => { - super::legacy::LegacyDriver::register(inner, &mut fd, RW_INTERESTS) + super::legacy::LegacyDriver::register(inner, fd, RW_INTERESTS) } }); @@ -272,7 +268,7 @@ impl SharedFd { let state = CURRENT.with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] super::Inner::Uring(_) => State::Uring(UringState::Init), - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] super::Inner::Legacy(_) => State::Legacy(None), #[cfg(all( not(feature = "legacy"), @@ -300,7 +296,7 @@ impl SharedFd { SharedFd { inner: Rc::new(Inner { - fd: RawFd::new(fd), + fd, state: UnsafeCell::new(state), }), } @@ -315,12 +311,12 @@ impl SharedFd { #[cfg(windows)] /// Returns the RawSocket pub(crate) fn raw_socket(&self) -> RawSocket { - self.inner.fd.socket + self.inner.fd } #[cfg(windows)] pub(crate) fn raw_handle(&self) -> RawHandle { - unimplemented!() + self.inner.fd as usize as _ } #[cfg(unix)] @@ -339,10 +335,10 @@ impl SharedFd { let mut state = unsafe { MaybeUninit::uninit().assume_init() }; std::mem::swap(&mut inner_skip_drop.state, &mut state); - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] let state = unsafe { &*state.get() }; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] #[allow(irrefutable_let_patterns)] if let State::Legacy(idx) = state { if CURRENT.is_set() { @@ -556,7 +552,7 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { } #[cfg(windows)] if let Some(idx) = idx { - let _ = super::legacy::LegacyDriver::deregister(inner, idx, &mut fd); + let _ = super::legacy::LegacyDriver::deregister(inner, idx, fd); } } } @@ -564,8 +560,8 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { } #[cfg(all(unix, feature = "legacy"))] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; - #[cfg(windows)] - let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; + #[cfg(all(windows, feature = "legacy"))] + let _ = unsafe { OwnedSocket::from_raw_socket(fd) }; } #[cfg(feature = "poll-io")] @@ -573,7 +569,7 @@ fn drop_uring_legacy(fd: RawFd, idx: Option) { if CURRENT.is_set() { CURRENT.with(|inner| { match inner { - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] super::Inner::Legacy(_) => { unreachable!("close uring fd with legacy runtime") } @@ -591,5 +587,5 @@ fn drop_uring_legacy(fd: RawFd, idx: Option) { #[cfg(unix)] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; #[cfg(windows)] - let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; + let _ = unsafe { OwnedSocket::from_raw_socket(fd) }; } diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index 53992338..e8c0f649 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -8,9 +8,7 @@ pub(super) fn cstr(p: &Path) -> io::Result { Ok(CString::new(p.as_os_str().as_bytes())?) } #[cfg(windows)] - { - unimplemented!() - } + Ok(CString::new(p.as_os_str().as_encoded_bytes())?) } // Convert Duration to Timespec diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index 513c1f1a..40462e59 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -39,15 +39,12 @@ pub use builder::{Buildable, RuntimeBuilder}; pub use driver::Driver; #[cfg(all(target_os = "linux", feature = "iouring"))] pub use driver::IoUringDriver; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] pub use driver::LegacyDriver; #[cfg(feature = "macros")] pub use monoio_macros::{main, test, test_all}; pub use runtime::{spawn, Runtime}; -#[cfg(all( - unix, - any(all(target_os = "linux", feature = "iouring"), feature = "legacy") -))] +#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] pub use {builder::FusionDriver, runtime::FusionRuntime}; /// Start a monoio runtime. diff --git a/monoio/src/macros/pin.rs b/monoio/src/macros/pin.rs index 1637d35c..83f48c2e 100644 --- a/monoio/src/macros/pin.rs +++ b/monoio/src/macros/pin.rs @@ -8,20 +8,6 @@ /// Pinning may be done by allocating with [`Box::pin`] or by using the stack /// with the `pin!` macro. /// -/// The following will **fail to compile**: -/// -/// ```compile_fail -/// async fn my_async_fn() { -/// // async logic here -/// } -/// -/// #[monoio::main] -/// async fn main() { -/// let mut future = my_async_fn(); -/// (&mut future).await; -/// } -/// ``` -/// /// To make this work requires pinning: /// /// ``` @@ -51,20 +37,6 @@ /// The `pin!` macro takes **identifiers** as arguments. It does **not** work /// with expressions. /// -/// The following does not compile as an expression is passed to `pin!`. -/// -/// ```compile_fail -/// async fn my_async_fn() { -/// // async logic here -/// } -/// -/// #[monoio::main] -/// async fn main() { -/// let mut future = pin!(my_async_fn()); -/// (&mut future).await; -/// } -/// ``` -/// /// Because assigning to a variable followed by pinning is common, there is also /// a variant of the macro that supports doing both in one go. /// diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 449c86eb..2f2649e1 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -34,7 +34,7 @@ impl TcpListener { #[cfg(unix)] let sys_listener = unsafe { std::net::TcpListener::from_raw_fd(fd.raw_fd()) }; #[cfg(windows)] - let sys_listener = unsafe { std::net::TcpListener::from_raw_socket(todo!()) }; + let sys_listener = unsafe { std::net::TcpListener::from_raw_socket(fd.raw_socket()) }; Self { fd, sys_listener: Some(sys_listener), @@ -57,7 +57,7 @@ impl TcpListener { let sys_listener = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] Self::set_non_blocking(&sys_listener)?; let addr = socket2::SockAddr::from(addr); @@ -225,7 +225,7 @@ impl TcpListener { }) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> { crate::driver::CURRENT.with(|x| match x { // TODO: windows ioring support diff --git a/monoio/src/net/tcp/stream_poll.rs b/monoio/src/net/tcp/stream_poll.rs index 69435135..14e12f7d 100644 --- a/monoio/src/net/tcp/stream_poll.rs +++ b/monoio/src/net/tcp/stream_poll.rs @@ -1,6 +1,11 @@ //! This module provide a poll-io style interface for TcpStream. -use std::{io, net::SocketAddr, os::fd::AsRawFd, time::Duration}; +use std::{io, net::SocketAddr, time::Duration}; + +#[cfg(unix)] +use {libc::shutdown, std::os::fd::AsRawFd}; +#[cfg(windows)] +use {std::os::windows::io::AsRawSocket, windows_sys::Win32::Networking::WinSock::shutdown}; use super::TcpStream; use crate::driver::op::Op; @@ -101,8 +106,15 @@ impl tokio::io::AsyncWrite for TcpStreamPoll { self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + #[cfg(unix)] let fd = self.0.as_raw_fd(); - let res = match unsafe { libc::shutdown(fd, libc::SHUT_WR) } { + #[cfg(windows)] + let fd = self.0.as_raw_socket() as _; + #[cfg(unix)] + let how = libc::SHUT_WR; + #[cfg(windows)] + let how = windows_sys::Win32::Networking::WinSock::SD_SEND; + let res = match unsafe { shutdown(fd, how) } { -1 => Err(io::Error::last_os_error()), _ => Ok(()), }; @@ -116,8 +128,7 @@ impl tokio::io::AsyncWrite for TcpStreamPoll { bufs: &[std::io::IoSlice<'_>], ) -> std::task::Poll> { unsafe { - let raw_buf = - crate::buf::RawBufVectored::new(bufs.as_ptr() as *const libc::iovec, bufs.len()); + let raw_buf = crate::buf::RawBufVectored::new(bufs.as_ptr() as _, bufs.len()); let mut writev = Op::writev_raw(&self.0.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut writev, cx)); @@ -168,9 +179,17 @@ impl TcpStreamPoll { } } +#[cfg(unix)] impl AsRawFd for TcpStreamPoll { #[inline] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { self.0.as_raw_fd() } } + +#[cfg(windows)] +impl AsRawSocket for TcpStreamPoll { + fn as_raw_socket(&self) -> std::os::windows::io::RawSocket { + self.0.as_raw_socket() + } +} diff --git a/monoio/src/net/udp.rs b/monoio/src/net/udp.rs index 6b0ddded..77608c3a 100644 --- a/monoio/src/net/udp.rs +++ b/monoio/src/net/udp.rs @@ -37,7 +37,7 @@ impl UdpSocket { Self { fd } } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> { crate::driver::CURRENT.with(|x| match x { // TODO: windows ioring support @@ -60,7 +60,7 @@ impl UdpSocket { }; let socket = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] Self::set_non_blocking(&socket)?; let addr = socket2::SockAddr::from(addr); diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs index 38db6bb3..3e1aee0e 100644 --- a/monoio/src/net/unix/socket_addr.rs +++ b/monoio/src/net/unix/socket_addr.rs @@ -36,6 +36,7 @@ impl SocketAddr { return AddressKind::Unnamed; } let len = self.socklen as usize - offset; + #[allow(clippy::unnecessary_cast)] let path = unsafe { &*(&self.sockaddr.sun_path as *const [libc::c_char] as *const [u8]) }; // macOS seems to return a len of 16 and a zeroed sun_path for unnamed addresses diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index e1e4d7bb..16729239 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -1,13 +1,10 @@ use std::future::Future; -#[cfg(all( - unix, - any(all(target_os = "linux", feature = "iouring"), feature = "legacy") -))] +#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] use crate::LegacyDriver; use crate::{ driver::Driver, @@ -201,7 +198,7 @@ impl Runtime { /// Fusion Runtime is a wrapper of io_uring driver or legacy driver based /// runtime. -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> { /// Uring driver based runtime. #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -242,11 +239,7 @@ where } } -#[cfg(all( - unix, - feature = "legacy", - not(all(target_os = "linux", feature = "iouring")) -))] +#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))] impl FusionRuntime where R: Driver, @@ -315,11 +308,7 @@ impl From>> } // R -> Fusion -#[cfg(all( - unix, - feature = "legacy", - not(all(target_os = "linux", feature = "iouring")) -))] +#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))] impl From> for FusionRuntime { fn from(r: Runtime) -> Self { Self::Legacy(r) @@ -327,11 +316,7 @@ impl From> for FusionRuntime { } // TR -> Fusion -#[cfg(all( - unix, - feature = "legacy", - not(all(target_os = "linux", feature = "iouring")) -))] +#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))] impl From>> for FusionRuntime> { fn from(r: Runtime>) -> Self { Self::Legacy(r) diff --git a/monoio/tests/buf_writter.rs b/monoio/tests/buf_writter.rs index c1f9e00a..f4236404 100644 --- a/monoio/tests/buf_writter.rs +++ b/monoio/tests/buf_writter.rs @@ -3,6 +3,7 @@ use monoio::{ net::{TcpListener, TcpStream}, }; +#[cfg(not(windows))] #[monoio::test_all] async fn ensure_buf_writter_write_properly() { let srv = TcpListener::bind("127.0.0.1:0").unwrap(); diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index b0fa8cc8..84d4eeaf 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -1,7 +1,8 @@ -use std::{ - io::prelude::*, - os::unix::io::{AsRawFd, FromRawFd, RawFd}, -}; +use std::io::prelude::*; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle as RawFd}; use monoio::fs::File; use tempfile::NamedTempFile; @@ -83,7 +84,10 @@ async fn explicit_close() { tempfile.write_all(HELLO).unwrap(); let file = File::open(tempfile.path()).await.unwrap(); + #[cfg(unix)] let fd = file.as_raw_fd(); + #[cfg(windows)] + let fd = file.as_raw_handle(); file.close().await.unwrap(); @@ -103,6 +107,7 @@ async fn drop_open() { drop(file_w); } +#[cfg(not(windows))] #[test] fn drop_off_runtime() { let tempfile = tempfile(); @@ -115,7 +120,10 @@ fn drop_off_runtime() { File::open(tempfile.path()).await.unwrap() }); + #[cfg(unix)] let fd = file.as_raw_fd(); + #[cfg(windows)] + let fd = file.as_raw_handle(); drop(file); assert_invalid_fd(fd); @@ -153,8 +161,10 @@ async fn poll_once(future: impl std::future::Future) { fn assert_invalid_fd(fd: RawFd) { use std::fs::File; - + #[cfg(unix)] let mut f = unsafe { File::from_raw_fd(fd) }; + #[cfg(windows)] + let mut f = unsafe { File::from_raw_handle(fd) }; let mut buf = vec![]; assert!(f.read_to_end(&mut buf).is_err()); diff --git a/monoio/tests/uds_cred.rs b/monoio/tests/uds_cred.rs index 1a371899..717d6589 100644 --- a/monoio/tests/uds_cred.rs +++ b/monoio/tests/uds_cred.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use libc::{getegid, geteuid}; use monoio::net::UnixStream; diff --git a/monoio/tests/uds_split.rs b/monoio/tests/uds_split.rs index 21f8cefa..350a51c4 100644 --- a/monoio/tests/uds_split.rs +++ b/monoio/tests/uds_split.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt, Splitable}, net::UnixStream, diff --git a/monoio/tests/uds_stream.rs b/monoio/tests/uds_stream.rs index 067f3b45..5d6c3772 100644 --- a/monoio/tests/uds_stream.rs +++ b/monoio/tests/uds_stream.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use futures::future::try_join; use monoio::{ io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt}, diff --git a/monoio/tests/unix_datagram.rs b/monoio/tests/unix_datagram.rs index f5802585..67fec883 100644 --- a/monoio/tests/unix_datagram.rs +++ b/monoio/tests/unix_datagram.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use monoio::net::unix::UnixDatagram; #[monoio::test_all]