From 9c5611a46cb7da146340e2bf836af69b69e8a4d0 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 17:57:56 +0800 Subject: [PATCH 01/19] use polling --- .github/workflows/ci.yml | 4 - monoio/Cargo.toml | 5 +- monoio/src/driver/legacy/iocp/afd.rs | 201 ---------------- monoio/src/driver/legacy/iocp/core.rs | 123 ---------- monoio/src/driver/legacy/iocp/event.rs | 120 ---------- monoio/src/driver/legacy/iocp/mod.rs | 312 ------------------------- monoio/src/driver/legacy/iocp/state.rs | 291 ----------------------- monoio/src/driver/legacy/iocp/waker.rs | 26 --- monoio/src/driver/legacy/mod.rs | 78 ++++--- monoio/src/driver/legacy/waker.rs | 10 - monoio/src/driver/ready.rs | 14 +- monoio/src/driver/shared_fd.rs | 16 +- 12 files changed, 61 insertions(+), 1139 deletions(-) delete mode 100644 monoio/src/driver/legacy/iocp/afd.rs delete mode 100644 monoio/src/driver/legacy/iocp/core.rs delete mode 100644 monoio/src/driver/legacy/iocp/event.rs delete mode 100644 monoio/src/driver/legacy/iocp/mod.rs delete mode 100644 monoio/src/driver/legacy/iocp/state.rs delete mode 100644 monoio/src/driver/legacy/iocp/waker.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b07fb910..26421d76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,13 +101,9 @@ jobs: - target: x86_64-pc-windows-msvc os: windows-latest - no_run: 1 - target: x86_64-pc-windows-gnu os: windows-latest - no_run: 1 - target: i686-pc-windows-msvc os: windows-latest - no_run: 1 - target: i686-pc-windows-gnu os: windows-latest - no_run: 1 diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 6a5cce1c..eb1e6fd3 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -46,6 +46,7 @@ windows-sys = { version = "0.48.0", features = [ "Win32_Storage_FileSystem", "Win32_Security" ] } +polling = { version = "2.8.0", optional = true } # unix dependencies [target.'cfg(unix)'.dependencies] @@ -78,13 +79,13 @@ utils = ["nix"] # enable debug if you want to know what runtime does debug = ["tracing"] # enable legacy driver support(will make monoio available for older kernel and macOS) -legacy = ["mio"] +legacy = ["mio", "polling"] # iouring support iouring = ["io-uring"] # tokio-compatible(only have effect when legacy is enabled and iouring is not) tokio-compat = ["tokio"] # (experimental)enable poll-io to convert structs to structs that impl tokio's poll io -poll-io = ["tokio", "mio"] +poll-io = ["tokio", "mio", "polling"] # signal enables setting ctrl_c handler signal = ["ctrlc", "sync"] signal-termination = ["signal", "ctrlc/termination"] diff --git a/monoio/src/driver/legacy/iocp/afd.rs b/monoio/src/driver/legacy/iocp/afd.rs deleted file mode 100644 index 05d73056..00000000 --- a/monoio/src/driver/legacy/iocp/afd.rs +++ /dev/null @@ -1,201 +0,0 @@ -use std::{ - ffi::c_void, - fs::File, - os::windows::prelude::{AsRawHandle, FromRawHandle, RawHandle}, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use windows_sys::Win32::{ - Foundation::{ - RtlNtStatusToDosError, HANDLE, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_NOT_FOUND, - STATUS_PENDING, STATUS_SUCCESS, UNICODE_STRING, - }, - Storage::FileSystem::{ - NtCreateFile, SetFileCompletionNotificationModes, FILE_OPEN, FILE_SHARE_READ, - FILE_SHARE_WRITE, SYNCHRONIZE, - }, - System::WindowsProgramming::{ - NtDeviceIoControlFile, FILE_SKIP_SET_EVENT_ON_HANDLE, IO_STATUS_BLOCK, IO_STATUS_BLOCK_0, - OBJECT_ATTRIBUTES, - }, -}; - -use super::CompletionPort; - -#[link(name = "ntdll")] -extern "system" { - /// See - /// - /// This is an undocumented API and as such not part of - /// from which `windows-sys` is generated, and also unlikely to be added, so - /// we manually declare it here - fn NtCancelIoFileEx( - FileHandle: HANDLE, - IoRequestToCancel: *mut IO_STATUS_BLOCK, - IoStatusBlock: *mut IO_STATUS_BLOCK, - ) -> NTSTATUS; -} - -static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0); - -macro_rules! s { - ($($id:expr)+) => { - &[$($id as u16),+] - } -} - -pub const POLL_RECEIVE: u32 = 0b0_0000_0001; -pub const POLL_RECEIVE_EXPEDITED: u32 = 0b0_0000_0010; -pub const POLL_SEND: u32 = 0b0_0000_0100; -pub const POLL_DISCONNECT: u32 = 0b0_0000_1000; -pub const POLL_ABORT: u32 = 0b0_0001_0000; -pub const POLL_LOCAL_CLOSE: u32 = 0b0_0010_0000; -// Not used as it indicated in each event where a connection is connected, not -// just the first time a connection is established. -// Also see https://github.com/piscisaureus/wepoll/commit/8b7b340610f88af3d83f40fb728e7b850b090ece. -pub const POLL_CONNECT: u32 = 0b0_0100_0000; -pub const POLL_ACCEPT: u32 = 0b0_1000_0000; -pub const POLL_CONNECT_FAIL: u32 = 0b1_0000_0000; - -pub const KNOWN_EVENTS: u32 = POLL_RECEIVE - | POLL_RECEIVE_EXPEDITED - | POLL_SEND - | POLL_DISCONNECT - | POLL_ABORT - | POLL_LOCAL_CLOSE - | POLL_ACCEPT - | POLL_CONNECT_FAIL; - -#[repr(C)] -#[derive(Debug)] -pub struct AfdPollHandleInfo { - pub handle: HANDLE, - pub events: u32, - pub status: NTSTATUS, -} - -#[repr(C)] -#[derive(Debug)] -pub struct AfdPollInfo { - pub timeout: i64, - pub number_of_handles: u32, - pub exclusive: u32, - pub handles: [AfdPollHandleInfo; 1], -} - -#[derive(Debug)] -pub struct Afd { - file: File, -} - -impl Afd { - pub fn new(cp: &CompletionPort) -> std::io::Result { - const AFD_NAME: &[u16] = s!['\\' 'D' 'e' 'v' 'i' 'c' 'e' '\\' 'A' 'f' 'd' '\\' 'I' 'o']; - let mut device_name = UNICODE_STRING { - Length: std::mem::size_of_val(AFD_NAME) as u16, - MaximumLength: std::mem::size_of_val(AFD_NAME) as u16, - Buffer: AFD_NAME.as_ptr() as *mut u16, - }; - let mut device_attributes = OBJECT_ATTRIBUTES { - Length: std::mem::size_of::() as u32, - RootDirectory: 0, - ObjectName: &mut device_name, - Attributes: 0, - SecurityDescriptor: std::ptr::null_mut(), - SecurityQualityOfService: std::ptr::null_mut(), - }; - let mut handle = INVALID_HANDLE_VALUE; - let mut iosb = unsafe { std::mem::zeroed::() }; - let result = unsafe { - NtCreateFile( - &mut handle, - SYNCHRONIZE, - &mut device_attributes, - &mut iosb, - std::ptr::null_mut(), - 0, - FILE_SHARE_READ | FILE_SHARE_WRITE, - FILE_OPEN, - 0, - std::ptr::null_mut(), - 0, - ) - }; - - if result != STATUS_SUCCESS { - let error = unsafe { RtlNtStatusToDosError(result) }; - return Err(std::io::Error::from_raw_os_error(error as i32)); - } - - let file = unsafe { File::from_raw_handle(handle as RawHandle) }; - // Increment by 2 to reserve space for other types of handles. - // Non-AFD types (currently only NamedPipe), use odd numbered - // tokens. This allows the selector to differentiate between them - // and dispatch events accordingly. - let token = NEXT_TOKEN.fetch_add(2, Ordering::Relaxed) + 2; - cp.add_handle(token, file.as_raw_handle() as HANDLE)?; - let result = unsafe { - SetFileCompletionNotificationModes( - handle, - FILE_SKIP_SET_EVENT_ON_HANDLE as u8, // This is just 2, so fits in u8 - ) - }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(Self { file }) - } - } - - pub unsafe fn poll( - &self, - info: &mut AfdPollInfo, - iosb: *mut IO_STATUS_BLOCK, - overlapped: *mut c_void, - ) -> std::io::Result { - const IOCTL_AFD_POLL: u32 = 0x00012024; - let info_ptr = info as *mut _ as *mut c_void; - (*iosb).Anonymous.Status = STATUS_PENDING; - - let result = NtDeviceIoControlFile( - self.file.as_raw_handle() as HANDLE, - 0, - None, - overlapped, - iosb, - IOCTL_AFD_POLL, - info_ptr, - std::mem::size_of::() as u32, - info_ptr, - std::mem::size_of::() as u32, - ); - - match result { - STATUS_SUCCESS => Ok(true), - STATUS_PENDING => Ok(false), - status => { - let error = RtlNtStatusToDosError(status); - Err(std::io::Error::from_raw_os_error(error as i32)) - } - } - } - - pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> std::io::Result<()> { - if (*iosb).Anonymous.Status != STATUS_PENDING { - return Ok(()); - } - let mut cancel_iosb = IO_STATUS_BLOCK { - Anonymous: IO_STATUS_BLOCK_0 { Status: 0 }, - Information: 0, - }; - let status = NtCancelIoFileEx(self.file.as_raw_handle() as HANDLE, iosb, &mut cancel_iosb); - - if status == STATUS_SUCCESS || status == STATUS_NOT_FOUND { - Ok(()) - } else { - let error = RtlNtStatusToDosError(status); - Err(std::io::Error::from_raw_os_error(error as i32)) - } - } -} diff --git a/monoio/src/driver/legacy/iocp/core.rs b/monoio/src/driver/legacy/iocp/core.rs deleted file mode 100644 index d871fd10..00000000 --- a/monoio/src/driver/legacy/iocp/core.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::{ - os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}, - time::Duration, -}; - -use windows_sys::Win32::{ - Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}, - System::IO::{ - CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, - OVERLAPPED_ENTRY, - }, -}; - -#[derive(Debug)] -pub struct CompletionPort { - handle: HANDLE, -} - -impl CompletionPort { - pub fn new(value: u32) -> std::io::Result { - let handle = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, value) }; - - if handle == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(Self { handle }) - } - } - - pub fn add_handle(&self, token: usize, handle: HANDLE) -> std::io::Result<()> { - let result = unsafe { CreateIoCompletionPort(handle, self.handle, token, 0) }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) - } - } - - pub fn get_many<'a>( - &self, - entries: &'a mut [OVERLAPPED_ENTRY], - timeout: Option, - ) -> std::io::Result<&'a mut [OVERLAPPED_ENTRY]> { - let mut count = 0; - let result = unsafe { - GetQueuedCompletionStatusEx( - self.handle, - entries.as_mut_ptr(), - std::cmp::min(entries.len(), u32::max_value() as usize) as u32, - &mut count, - duration_millis(timeout), - 0, - ) - }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(&mut entries[..count as usize]) - } - } - - pub fn post(&self, entry: OVERLAPPED_ENTRY) -> std::io::Result<()> { - let result = unsafe { - PostQueuedCompletionStatus( - self.handle, - entry.dwNumberOfBytesTransferred, - entry.lpCompletionKey, - entry.lpOverlapped, - ) - }; - - if result == 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) - } - } -} - -impl Drop for CompletionPort { - fn drop(&mut self) { - unsafe { CloseHandle(self.handle) }; - } -} - -impl AsRawHandle for CompletionPort { - fn as_raw_handle(&self) -> RawHandle { - self.handle as RawHandle - } -} - -impl FromRawHandle for CompletionPort { - unsafe fn from_raw_handle(handle: RawHandle) -> Self { - Self { - handle: handle as HANDLE, - } - } -} - -impl IntoRawHandle for CompletionPort { - fn into_raw_handle(self) -> RawHandle { - self.handle as RawHandle - } -} - -#[inline] -fn duration_millis(dur: Option) -> u32 { - if let Some(dur) = dur { - // `Duration::as_millis` truncates, so round up. This avoids - // turning sub-millisecond timeouts into a zero timeout, unless - // the caller explicitly requests that by specifying a zero - // timeout. - let dur_ms = dur - .checked_add(Duration::from_nanos(999_999)) - .unwrap_or(dur) - .as_millis(); - std::cmp::min(dur_ms, u32::MAX as u128) as u32 - } else { - u32::MAX - } -} diff --git a/monoio/src/driver/legacy/iocp/event.rs b/monoio/src/driver/legacy/iocp/event.rs deleted file mode 100644 index 0f962ff8..00000000 --- a/monoio/src/driver/legacy/iocp/event.rs +++ /dev/null @@ -1,120 +0,0 @@ -use mio::Token; -use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY; - -use super::afd; - -#[derive(Clone)] -pub struct Event { - pub flags: u32, - pub data: u64, -} - -impl Event { - pub fn new(token: Token) -> Event { - Event { - flags: 0, - data: usize::from(token) as u64, - } - } - - pub fn token(&self) -> Token { - Token(self.data as usize) - } - - pub fn set_readable(&mut self) { - self.flags |= afd::POLL_RECEIVE - } - - pub fn set_writable(&mut self) { - self.flags |= afd::POLL_SEND; - } - - pub fn from_entry(status: &OVERLAPPED_ENTRY) -> Event { - Event { - flags: status.dwNumberOfBytesTransferred, - data: status.lpCompletionKey as u64, - } - } - - pub fn to_entry(&self) -> OVERLAPPED_ENTRY { - OVERLAPPED_ENTRY { - dwNumberOfBytesTransferred: self.flags, - lpCompletionKey: self.data as usize, - lpOverlapped: std::ptr::null_mut(), - Internal: 0, - } - } - - pub fn is_readable(&self) -> bool { - self.flags & READABLE_FLAGS != 0 - } - - pub fn is_writable(&self) -> bool { - self.flags & WRITABLE_FLAGS != 0 - } - - pub fn is_error(&self) -> bool { - self.flags & ERROR_FLAGS != 0 - } - - pub fn is_read_closed(&self) -> bool { - self.flags & READ_CLOSED_FLAGS != 0 - } - - pub fn is_write_closed(&self) -> bool { - self.flags & WRITE_CLOSED_FLAGS != 0 - } - - pub fn is_priority(&self) -> bool { - self.flags & afd::POLL_RECEIVE_EXPEDITED != 0 - } -} - -pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE - | afd::POLL_DISCONNECT - | afd::POLL_ACCEPT - | afd::POLL_ABORT - | afd::POLL_CONNECT_FAIL; -pub(crate) const WRITABLE_FLAGS: u32 = afd::POLL_SEND | afd::POLL_ABORT | afd::POLL_CONNECT_FAIL; -pub(crate) const ERROR_FLAGS: u32 = afd::POLL_CONNECT_FAIL; -pub(crate) const READ_CLOSED_FLAGS: u32 = - afd::POLL_DISCONNECT | afd::POLL_ABORT | afd::POLL_CONNECT_FAIL; -pub(crate) const WRITE_CLOSED_FLAGS: u32 = afd::POLL_ABORT | afd::POLL_CONNECT_FAIL; - -pub struct Events { - pub statuses: Box<[OVERLAPPED_ENTRY]>, - - pub events: Vec, -} - -impl Events { - pub fn with_capacity(cap: usize) -> Events { - Events { - statuses: unsafe { vec![std::mem::zeroed(); cap].into_boxed_slice() }, - events: Vec::with_capacity(cap), - } - } - - pub fn is_empty(&self) -> bool { - self.events.is_empty() - } - - pub fn capacity(&self) -> usize { - self.events.capacity() - } - - pub fn len(&self) -> usize { - self.events.len() - } - - pub fn get(&self, idx: usize) -> Option<&Event> { - self.events.get(idx) - } - - pub fn clear(&mut self) { - self.events.clear(); - for status in self.statuses.iter_mut() { - *status = unsafe { std::mem::zeroed() }; - } - } -} diff --git a/monoio/src/driver/legacy/iocp/mod.rs b/monoio/src/driver/legacy/iocp/mod.rs deleted file mode 100644 index 59dc8b37..00000000 --- a/monoio/src/driver/legacy/iocp/mod.rs +++ /dev/null @@ -1,312 +0,0 @@ -mod afd; -mod core; -mod event; -mod state; -mod waker; - -pub use core::*; -use std::{ - collections::VecDeque, - os::windows::prelude::RawSocket, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, - time::Duration, -}; - -pub use afd::*; -pub use event::*; -pub use state::*; -pub use waker::*; -use windows_sys::Win32::{ - Foundation::WAIT_TIMEOUT, - System::IO::{OVERLAPPED, OVERLAPPED_ENTRY}, -}; - -pub struct Poller { - is_polling: AtomicBool, - cp: CompletionPort, - update_queue: Mutex>>>>, - afd: Mutex>>, -} - -impl Poller { - pub fn new() -> std::io::Result { - Ok(Self { - is_polling: AtomicBool::new(false), - cp: CompletionPort::new(0)?, - update_queue: Mutex::new(VecDeque::new()), - afd: Mutex::new(Vec::new()), - }) - } - - pub fn poll(&self, events: &mut Events, timeout: Option) -> std::io::Result<()> { - events.clear(); - - if timeout.is_none() { - loop { - let len = self.poll_inner(&mut events.statuses, &mut events.events, None)?; - if len == 0 { - continue; - } - break Ok(()); - } - } else { - self.poll_inner(&mut events.statuses, &mut events.events, timeout)?; - Ok(()) - } - } - - pub fn poll_inner( - &self, - entries: &mut [OVERLAPPED_ENTRY], - events: &mut Vec, - timeout: Option, - ) -> std::io::Result { - self.is_polling.swap(true, Ordering::AcqRel); - - unsafe { self.update_sockets_events() }?; - - let result = self.cp.get_many(entries, timeout); - - self.is_polling.store(false, Ordering::Relaxed); - - match result { - Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }), - Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0), - Err(e) => Err(e), - } - } - - unsafe fn update_sockets_events(&self) -> std::io::Result<()> { - let mut queue = self.update_queue.lock().unwrap(); - for sock in queue.iter_mut() { - let mut sock_internal = sock.lock().unwrap(); - if !sock_internal.delete_pending { - sock_internal.update(sock)?; - } - } - - queue.retain(|sock| sock.lock().unwrap().error.is_some()); - - let mut afd = self.afd.lock().unwrap(); - afd.retain(|g| Arc::strong_count(g) > 1); - Ok(()) - } - - unsafe fn feed_events(&self, events: &mut Vec, entries: &[OVERLAPPED_ENTRY]) -> usize { - let mut n = 0; - let mut update_queue = self.update_queue.lock().unwrap(); - for entry in entries.iter() { - if entry.lpOverlapped.is_null() { - events.push(Event::from_entry(entry)); - n += 1; - continue; - } - - let sock_state = from_overlapped(entry.lpOverlapped); - let mut sock_guard = sock_state.lock().unwrap(); - if let Some(e) = sock_guard.feed_event() { - events.push(e); - n += 1; - } - - if !sock_guard.delete_pending { - update_queue.push_back(sock_state.clone()); - } - } - let mut afd = self.afd.lock().unwrap(); - afd.retain(|sock| Arc::strong_count(sock) > 1); - n - } - - pub fn register( - &self, - state: &mut SocketState, - token: mio::Token, - interests: mio::Interest, - ) -> std::io::Result<()> { - if state.inner.is_none() { - let flags = interests_to_afd_flags(interests); - - let inner = { - let sock = self._alloc_sock_for_rawsocket(state.socket)?; - let event = Event { - flags, - data: token.0 as u64, - }; - sock.lock().unwrap().set_event(event); - sock - }; - - self.queue_state(inner.clone()); - unsafe { self.update_sockets_events_if_polling()? }; - state.inner = Some(inner); - state.token = token; - state.interest = interests; - - Ok(()) - } else { - Err(std::io::ErrorKind::AlreadyExists.into()) - } - } - - pub fn reregister( - &self, - state: &mut SocketState, - token: mio::Token, - interests: mio::Interest, - ) -> std::io::Result<()> { - if let Some(inner) = state.inner.as_mut() { - { - let event = Event { - flags: interests_to_afd_flags(interests), - data: token.0 as u64, - }; - - inner.lock().unwrap().set_event(event); - } - - state.token = token; - state.interest = interests; - - self.queue_state(inner.clone()); - unsafe { self.update_sockets_events_if_polling() } - } else { - Err(std::io::ErrorKind::NotFound.into()) - } - } - - pub fn deregister(&mut self, state: &mut SocketState) -> std::io::Result<()> { - if let Some(inner) = state.inner.as_mut() { - { - let mut sock_state = inner.lock().unwrap(); - sock_state.mark_delete(); - } - state.inner = None; - Ok(()) - } else { - Err(std::io::ErrorKind::NotFound.into()) - } - } - - /// This function is called by register() and reregister() to start an - /// IOCTL_AFD_POLL operation corresponding to the registered events, but - /// only if necessary. - /// - /// Since it is not possible to modify or synchronously cancel an AFD_POLL - /// operation, and there can be only one active AFD_POLL operation per - /// (socket, completion port) pair at any time, it is expensive to change - /// a socket's event registration after it has been submitted to the kernel. - /// - /// Therefore, if no other threads are polling when interest in a socket - /// event is (re)registered, the socket is added to the 'update queue', but - /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred - /// until just before the GetQueuedCompletionStatusEx() syscall is made. - /// - /// However, when another thread is already blocked on - /// GetQueuedCompletionStatusEx() we tell the kernel about the registered - /// socket event(s) immediately. - unsafe fn update_sockets_events_if_polling(&self) -> std::io::Result<()> { - if self.is_polling.load(Ordering::Acquire) { - self.update_sockets_events() - } else { - Ok(()) - } - } - - fn queue_state(&self, sock_state: Pin>>) { - let mut update_queue = self.update_queue.lock().unwrap(); - update_queue.push_back(sock_state); - } - - fn _alloc_sock_for_rawsocket( - &self, - raw_socket: RawSocket, - ) -> std::io::Result>>> { - const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; - - let mut afd_group = self.afd.lock().unwrap(); - if afd_group.len() == 0 { - self._alloc_afd_group(&mut afd_group)?; - } else { - // + 1 reference in Vec - if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP__MAX_GROUP_SIZE { - self._alloc_afd_group(&mut afd_group)?; - } - } - let afd = match afd_group.last() { - Some(arc) => arc.clone(), - None => unreachable!("Cannot acquire afd"), - }; - - Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?))) - } - - fn _alloc_afd_group(&self, afd_group: &mut Vec>) -> std::io::Result<()> { - let afd = Afd::new(&self.cp)?; - let arc = Arc::new(afd); - afd_group.push(arc); - Ok(()) - } -} - -impl Drop for Poller { - fn drop(&mut self) { - loop { - let count: usize; - let mut statuses: [OVERLAPPED_ENTRY; 1024] = unsafe { std::mem::zeroed() }; - - let result = self - .cp - .get_many(&mut statuses, Some(std::time::Duration::from_millis(0))); - match result { - Ok(events) => { - count = events.iter().len(); - for event in events.iter() { - if event.lpOverlapped.is_null() { - } else { - // drain sock state to release memory of Arc reference - let _ = from_overlapped(event.lpOverlapped); - } - } - } - Err(_) => break, - } - - if count == 0 { - break; - } - } - - let mut afd_group = self.afd.lock().unwrap(); - afd_group.retain(|g| Arc::strong_count(g) > 1); - } -} - -pub fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin>> { - let sock_ptr: *const Mutex = ptr as *const _; - unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) } -} - -pub fn into_overlapped(sock_state: Pin>>) -> *mut std::ffi::c_void { - let overlapped_ptr: *const Mutex = - unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) }; - overlapped_ptr as *mut _ -} - -pub fn interests_to_afd_flags(interests: mio::Interest) -> u32 { - let mut flags = 0; - - if interests.is_readable() { - flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS; - } - - if interests.is_writable() { - flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS; - } - - flags -} diff --git a/monoio/src/driver/legacy/iocp/state.rs b/monoio/src/driver/legacy/iocp/state.rs deleted file mode 100644 index a550eb6e..00000000 --- a/monoio/src/driver/legacy/iocp/state.rs +++ /dev/null @@ -1,291 +0,0 @@ -use core::fmt::Debug; -use std::{ - marker::PhantomPinned, - os::windows::prelude::RawSocket, - pin::Pin, - sync::{Arc, Mutex}, -}; - -use windows_sys::Win32::{ - Foundation::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED}, - Networking::WinSock::{ - WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE, SIO_BSP_HANDLE_POLL, - SIO_BSP_HANDLE_SELECT, SOCKET_ERROR, - }, - System::WindowsProgramming::IO_STATUS_BLOCK, -}; - -use super::{afd, from_overlapped, into_overlapped, Afd, AfdPollInfo, Event}; - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SockPollStatus { - Idle, - Pending, - Cancelled, -} - -#[derive(Debug)] -pub struct SocketState { - pub socket: RawSocket, - pub inner: Option>>>, - pub token: mio::Token, - pub interest: mio::Interest, -} - -impl SocketState { - pub fn new(socket: RawSocket) -> Self { - Self { - socket, - inner: None, - token: mio::Token(0), - interest: mio::Interest::READABLE, - } - } -} - -pub struct SockState { - pub iosb: IO_STATUS_BLOCK, - pub poll_info: AfdPollInfo, - pub afd: Arc, - - pub base_socket: RawSocket, - - pub user_evts: u32, - pub pending_evts: u32, - - pub user_data: u64, - - pub poll_status: SockPollStatus, - pub delete_pending: bool, - - pub error: Option, - - _pinned: PhantomPinned, -} - -impl SockState { - pub fn new(raw_socket: RawSocket, afd: Arc) -> std::io::Result { - Ok(SockState { - iosb: unsafe { std::mem::zeroed() }, - poll_info: unsafe { std::mem::zeroed() }, - afd, - base_socket: get_base_socket(raw_socket)?, - user_evts: 0, - pending_evts: 0, - user_data: 0, - poll_status: SockPollStatus::Idle, - delete_pending: false, - error: None, - _pinned: PhantomPinned, - }) - } - - pub fn update(&mut self, self_arc: &Pin>>) -> std::io::Result<()> { - assert!(!self.delete_pending); - - // make sure to reset previous error before a new update - self.error = None; - - if let SockPollStatus::Pending = self.poll_status { - if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 { - // All the events the user is interested in are already being monitored by - // the pending poll operation. It might spuriously complete because of an - // event that we're no longer interested in; when that happens we'll submit - // a new poll operation with the updated event mask. - } else { - // A poll operation is already pending, but it's not monitoring for all the - // events that the user is interested in. Therefore, cancel the pending - // poll operation; when we receive it's completion package, a new poll - // operation will be submitted with the correct event mask. - if let Err(e) = self.cancel() { - self.error = e.raw_os_error(); - return Err(e); - } - return Ok(()); - } - } else if let SockPollStatus::Cancelled = self.poll_status { - // The poll operation has already been cancelled, we're still waiting for - // it to return. For now, there's nothing that needs to be done. - } else if let SockPollStatus::Idle = self.poll_status { - // No poll operation is pending; start one. - self.poll_info.exclusive = 0; - self.poll_info.number_of_handles = 1; - self.poll_info.timeout = i64::MAX; - self.poll_info.handles[0].handle = self.base_socket as HANDLE; - self.poll_info.handles[0].status = 0; - self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE; - - // Increase the ref count as the memory will be used by the kernel. - let overlapped_ptr = into_overlapped(self_arc.clone()); - - let result = unsafe { - self.afd - .poll(&mut self.poll_info, &mut self.iosb, overlapped_ptr) - }; - if let Err(e) = result { - let code = e.raw_os_error().unwrap(); - if code == ERROR_IO_PENDING as i32 { - // Overlapped poll operation in progress; this is expected. - } else { - // Since the operation failed it means the kernel won't be - // using the memory any more. - drop(from_overlapped(overlapped_ptr as *mut _)); - if code == ERROR_INVALID_HANDLE as i32 { - // Socket closed; it'll be dropped. - self.mark_delete(); - return Ok(()); - } else { - self.error = e.raw_os_error(); - return Err(e); - } - } - } - - self.poll_status = SockPollStatus::Pending; - self.pending_evts = self.user_evts; - } else { - unreachable!("Invalid poll status during update") - } - - Ok(()) - } - - pub fn feed_event(&mut self) -> Option { - self.poll_status = SockPollStatus::Idle; - self.pending_evts = 0; - - let mut afd_events = 0; - // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is - // unsafe to use a pointer of IO_STATUS_BLOCK. - unsafe { - if self.delete_pending { - return None; - } else if self.iosb.Anonymous.Status == STATUS_CANCELLED { - // The poll request was cancelled by CancelIoEx. - } else if self.iosb.Anonymous.Status < 0 { - // The overlapped request itself failed in an unexpected way. - afd_events = afd::POLL_CONNECT_FAIL; - } else if self.poll_info.number_of_handles < 1 { - // This poll operation succeeded but didn't report any socket events. - } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 { - // The poll operation reported that the socket was closed. - self.mark_delete(); - return None; - } else { - afd_events = self.poll_info.handles[0].events; - } - } - - afd_events &= self.user_evts; - - if afd_events == 0 { - return None; - } - - self.user_evts &= !afd_events; - - Some(Event { - data: self.user_data, - flags: afd_events, - }) - } - - pub fn mark_delete(&mut self) { - if !self.delete_pending { - if let SockPollStatus::Pending = self.poll_status { - drop(self.cancel()); - } - - self.delete_pending = true; - } - } - - pub fn set_event(&mut self, ev: Event) -> bool { - // afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested - // by the caller. - let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT; - - self.user_evts = events; - self.user_data = ev.data; - - (events & !self.pending_evts) != 0 - } - - pub fn cancel(&mut self) -> std::io::Result<()> { - match self.poll_status { - SockPollStatus::Pending => {} - _ => unreachable!("Invalid poll status during cancel"), - }; - unsafe { - self.afd.cancel(&mut self.iosb)?; - } - self.poll_status = SockPollStatus::Cancelled; - self.pending_evts = 0; - Ok(()) - } -} - -impl Debug for SockState { - #[allow(unused_variables)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } -} - -impl Drop for SockState { - fn drop(&mut self) { - self.mark_delete(); - } -} - -fn get_base_socket(raw_socket: RawSocket) -> std::io::Result { - let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE); - if let Ok(base_socket) = res { - return Ok(base_socket); - } - - // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore - // it should not fail as long as `raw_socket` is a valid socket. See - // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls. - // However, at least one known LSP deliberately breaks it, so we try - // some alternative IOCTLs, starting with the most appropriate one. - for &ioctl in &[SIO_BSP_HANDLE_SELECT, SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE] { - if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) { - // Since we know now that we're dealing with an LSP (otherwise - // SIO_BASE_HANDLE would't have failed), only return any result - // when it is different from the original `raw_socket`. - if base_socket != raw_socket { - return Ok(base_socket); - } - } - } - - // If the alternative IOCTLs also failed, return the original error. - let os_error = res.unwrap_err(); - let err = std::io::Error::from_raw_os_error(os_error); - Err(err) -} - -fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result { - let mut base_socket: RawSocket = 0; - let mut bytes: u32 = 0; - let result = unsafe { - WSAIoctl( - raw_socket as usize, - ioctl, - std::ptr::null_mut(), - 0, - &mut base_socket as *mut _ as *mut std::ffi::c_void, - std::mem::size_of::() as u32, - &mut bytes, - std::ptr::null_mut(), - None, - ) - }; - - if result != SOCKET_ERROR { - Ok(base_socket) - } else { - Err(unsafe { WSAGetLastError() }) - } -} diff --git a/monoio/src/driver/legacy/iocp/waker.rs b/monoio/src/driver/legacy/iocp/waker.rs deleted file mode 100644 index d5aa9a99..00000000 --- a/monoio/src/driver/legacy/iocp/waker.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::{io, sync::Arc}; - -use super::{CompletionPort, Event, Poller}; - -#[derive(Debug)] -pub struct Waker { - token: mio::Token, - port: Arc, -} - -impl Waker { - #[allow(unreachable_code, unused_variables)] - pub fn new(poller: &Poller, token: mio::Token) -> io::Result { - Ok(Waker { - token, - // port: poller.cp.clone(), - port: unimplemented!(), - }) - } - - pub fn wake(&self) -> io::Result<()> { - let mut ev = Event::new(self.token); - ev.set_readable(); - self.port.post(ev.to_entry()) - } -} diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 9d0f796e..d8a1f772 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -8,6 +8,15 @@ use std::{ time::Duration, }; +#[cfg(unix)] +use mio::{event::Source, Events}; +use mio::{Interest, Token}; +#[cfg(windows)] +use { + polling::{Event, PollMode, Poller}, + std::os::windows::io::RawSocket, +}; + use super::{ op::{CompletionMeta, Op, OpAble}, ready::{self, Ready}, @@ -16,10 +25,6 @@ use super::{ }; use crate::utils::slab::Slab; -#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)] -#[cfg(windows)] -pub(super) mod iocp; - #[cfg(feature = "sync")] mod waker; #[cfg(feature = "sync")] @@ -28,13 +33,13 @@ pub(crate) use waker::UnparkHandle; pub(crate) struct LegacyInner { pub(crate) io_dispatch: Slab, #[cfg(unix)] - events: mio::Events, + events: Events, #[cfg(unix)] poll: mio::Poll, #[cfg(windows)] - events: iocp::Events, + events: Vec, #[cfg(windows)] - poll: iocp::Poller, + poll: Poller, #[cfg(feature = "sync")] shared_waker: std::sync::Arc, @@ -55,7 +60,7 @@ pub struct LegacyDriver { } #[cfg(feature = "sync")] -const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); +const TOKEN_WAKEUP: Token = Token(1 << 31); #[allow(dead_code)] impl LegacyDriver { @@ -69,7 +74,7 @@ impl LegacyDriver { #[cfg(unix)] let poll = mio::Poll::new()?; #[cfg(windows)] - let poll = iocp::Poller::new()?; + let poll = Poller::new()?; #[cfg(all(unix, feature = "sync"))] let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new( @@ -77,10 +82,7 @@ impl LegacyDriver { TOKEN_WAKEUP, )?)); #[cfg(all(windows, feature = "sync"))] - let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new( - &poll, - TOKEN_WAKEUP, - )?)); + let shared_waker = unimplemented!(); #[cfg(feature = "sync")] let (waker_sender, waker_receiver) = flume::unbounded::(); #[cfg(feature = "sync")] @@ -89,11 +91,11 @@ impl LegacyDriver { let inner = LegacyInner { io_dispatch: Slab::new(), #[cfg(unix)] - events: mio::Events::with_capacity(entries as usize), + events: Events::with_capacity(entries as usize), #[cfg(unix)] poll, #[cfg(windows)] - events: iocp::Events::with_capacity(entries as usize), + events: Vec::with_capacity(entries as usize), #[cfg(windows)] poll, #[cfg(feature = "sync")] @@ -152,17 +154,21 @@ impl LegacyDriver { // here we borrow 2 mut self, but its safe. let events = unsafe { &mut (*self.inner.get()).events }; - match inner.poll.poll(events, timeout) { + #[cfg(unix)] + let result = inner.poll.poll(events, timeout); + #[cfg(windows)] + let result = inner.poll.wait(events, timeout); + match result { Ok(_) => {} Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } - #[cfg(unix)] let iter = events.iter(); - #[cfg(windows)] - let iter = events.events.iter(); for event in iter { + #[cfg(unix)] let token = event.token(); + #[cfg(windows)] + let token = Token(event.key); #[cfg(feature = "sync")] if token != TOKEN_WAKEUP { @@ -178,14 +184,26 @@ impl LegacyDriver { #[cfg(windows)] pub(crate) fn register( this: &Rc>, - state: &mut iocp::SocketState, - interest: mio::Interest, + socket: RawSocket, + interest: Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; let io = ScheduledIo::default(); let token = inner.io_dispatch.insert(io); + let event = if interest.is_readable() && interest.is_writable() { + Event::all(token) + } else if interest.is_readable() { + Event::readable(token) + } else { + Event::writable(token) + }; + let mode = if inner.poll.supports_edge() { + PollMode::Edge + } else { + PollMode::Level + }; - match inner.poll.register(state, mio::Token(token), interest) { + match inner.poll.modify_with_mode(socket, event, mode) { Ok(_) => Ok(token), Err(e) => { inner.io_dispatch.remove(token); @@ -198,12 +216,12 @@ impl LegacyDriver { pub(crate) fn deregister( this: &Rc>, token: usize, - state: &mut iocp::SocketState, + socket: RawSocket, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; // try to deregister fd first, on success we will remove it from slab. - match inner.poll.deregister(state) { + match inner.poll.delete(socket) { Ok(_) => { inner.io_dispatch.remove(token); Ok(()) @@ -215,14 +233,14 @@ impl LegacyDriver { #[cfg(unix)] pub(crate) fn register( this: &Rc>, - source: &mut impl mio::event::Source, - interest: mio::Interest, + source: &mut impl Source, + interest: Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; let token = inner.io_dispatch.insert(ScheduledIo::new()); let registry = inner.poll.registry(); - match registry.register(source, mio::Token(token), interest) { + match registry.register(source, Token(token), interest) { Ok(_) => Ok(token), Err(e) => { inner.io_dispatch.remove(token); @@ -235,7 +253,7 @@ impl LegacyDriver { pub(crate) fn deregister( this: &Rc>, token: usize, - source: &mut impl mio::event::Source, + source: &mut impl Source, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; @@ -251,7 +269,7 @@ impl LegacyDriver { } impl LegacyInner { - fn dispatch(&mut self, token: mio::Token, ready: Ready) { + fn dispatch(&mut self, token: Token, ready: Ready) { let mut sio = match self.io_dispatch.get(token.0) { Some(io) => io, None => { @@ -324,7 +342,7 @@ impl LegacyInner { ready::Direction::Read => Ready::READ_CANCELED, ready::Direction::Write => Ready::WRITE_CANCELED, }; - inner.dispatch(mio::Token(index), ready); + inner.dispatch(Token(index), ready); } pub(crate) fn submit_with_data( diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 40290e96..41d68765 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -2,8 +2,6 @@ use crate::driver::unpark::Unpark; pub(crate) struct EventWaker { // raw waker - #[cfg(windows)] - waker: super::iocp::Waker, #[cfg(unix)] waker: mio::Waker, // Atomic awake status @@ -19,14 +17,6 @@ impl EventWaker { } } - #[cfg(windows)] - pub(crate) fn new(waker: super::iocp::Waker) -> Self { - Self { - waker, - awake: std::sync::atomic::AtomicBool::new(true), - } - } - pub(crate) fn wake(&self) -> std::io::Result<()> { // Skip wake if already awake if self.awake.load(std::sync::atomic::Ordering::Acquire) { diff --git a/monoio/src/driver/ready.rs b/monoio/src/driver/ready.rs index 6262359a..14843490 100644 --- a/monoio/src/driver/ready.rs +++ b/monoio/src/driver/ready.rs @@ -46,25 +46,17 @@ impl Ready { pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); #[cfg(windows)] - pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready { + pub(crate) fn from_mio(event: &polling::Event) -> Ready { let mut ready = Ready::EMPTY; - if event.is_readable() { + if event.readable { ready |= Ready::READABLE; } - if event.is_writable() { + if event.writable { ready |= Ready::WRITABLE; } - if event.is_read_closed() { - ready |= Ready::READ_CLOSED; - } - - if event.is_write_closed() { - ready |= Ready::WRITE_CLOSED; - } - ready } diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index 499ed3d2..aff5a69a 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -1,13 +1,13 @@ #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; #[cfg(windows)] +use std::os::windows::io::RawSocket as RawFd; +#[cfg(windows)] use std::os::windows::io::{ AsRawHandle, AsRawSocket, FromRawSocket, OwnedSocket, RawHandle, RawSocket, }; use std::{cell::UnsafeCell, io, rc::Rc}; -#[cfg(windows)] -use super::legacy::iocp::SocketState as RawFd; use super::CURRENT; // Tracks in-flight operations on a file descriptor. Ensures all in-flight @@ -245,12 +245,10 @@ impl SharedFd { pub(crate) fn new(fd: RawSocket) -> io::Result { const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); - let mut fd = RawFd::new(fd); - let state = { let reg = CURRENT.with(|inner| match inner { super::Inner::Legacy(inner) => { - super::legacy::LegacyDriver::register(inner, &mut fd, RW_INTERESTS) + super::legacy::LegacyDriver::register(inner, fd, RW_INTERESTS) } }); @@ -300,7 +298,7 @@ impl SharedFd { SharedFd { inner: Rc::new(Inner { - fd: RawFd::new(fd), + fd, state: UnsafeCell::new(state), }), } @@ -315,7 +313,7 @@ impl SharedFd { #[cfg(windows)] /// Returns the RawSocket pub(crate) fn raw_socket(&self) -> RawSocket { - self.inner.fd.socket + self.inner.fd } #[cfg(windows)] @@ -556,7 +554,7 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { } #[cfg(windows)] if let Some(idx) = idx { - let _ = super::legacy::LegacyDriver::deregister(inner, idx, &mut fd); + let _ = super::legacy::LegacyDriver::deregister(inner, idx, fd); } } } @@ -565,7 +563,7 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { #[cfg(all(unix, feature = "legacy"))] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; #[cfg(windows)] - let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; + let _ = unsafe { OwnedSocket::from_raw_socket(fd) }; } #[cfg(feature = "poll-io")] From 1b935402da74239907b173abe427728e2461073b Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 18:11:57 +0800 Subject: [PATCH 02/19] try make compile pass in windows --- monoio/tests/uds_split.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/monoio/tests/uds_split.rs b/monoio/tests/uds_split.rs index 21f8cefa..350a51c4 100644 --- a/monoio/tests/uds_split.rs +++ b/monoio/tests/uds_split.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt, Splitable}, net::UnixStream, From 2e4afd4b58114072912b0f4c43a2b8abe57163c5 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 18:26:11 +0800 Subject: [PATCH 03/19] try make compile pass in windows --- monoio/src/builder.rs | 10 +++++----- monoio/src/driver/shared_fd.rs | 10 +++++----- monoio/src/lib.rs | 2 +- monoio/src/net/tcp/listener.rs | 4 ++-- monoio/src/net/udp.rs | 4 ++-- monoio/src/runtime.rs | 4 ++-- monoio/tests/uds_stream.rs | 1 + monoio/tests/unix_datagram.rs | 1 + 8 files changed, 19 insertions(+), 17 deletions(-) diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index 4734c998..9be0258b 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -2,7 +2,7 @@ use std::{io, marker::PhantomData}; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::driver::IoUringDriver; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] use crate::driver::LegacyDriver; #[cfg(all(unix, any(feature = "legacy", feature = "iouring")))] use crate::utils::thread_id::gen_id; @@ -80,14 +80,14 @@ macro_rules! direct_build { direct_build!(IoUringDriver); #[cfg(all(target_os = "linux", feature = "iouring"))] direct_build!(TimeDriver); -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] direct_build!(LegacyDriver); -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] direct_build!(TimeDriver); // ===== builder impl ===== -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] impl Buildable for LegacyDriver { fn build(this: RuntimeBuilder) -> io::Result> { let thread_id = gen_id(); @@ -280,7 +280,7 @@ mod time_wrap { #[cfg(all(target_os = "linux", feature = "iouring"))] impl time_wrap::TimeWrapable for IoUringDriver {} -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] impl time_wrap::TimeWrapable for LegacyDriver {} #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] impl time_wrap::TimeWrapable for FusionDriver {} diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index aff5a69a..f22b7617 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -270,7 +270,7 @@ impl SharedFd { let state = CURRENT.with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] super::Inner::Uring(_) => State::Uring(UringState::Init), - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] super::Inner::Legacy(_) => State::Legacy(None), #[cfg(all( not(feature = "legacy"), @@ -337,10 +337,10 @@ impl SharedFd { let mut state = unsafe { MaybeUninit::uninit().assume_init() }; std::mem::swap(&mut inner_skip_drop.state, &mut state); - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] let state = unsafe { &*state.get() }; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] #[allow(irrefutable_let_patterns)] if let State::Legacy(idx) = state { if CURRENT.is_set() { @@ -562,7 +562,7 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { } #[cfg(all(unix, feature = "legacy"))] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; - #[cfg(windows)] + #[cfg(all(windows, feature = "legacy"))] let _ = unsafe { OwnedSocket::from_raw_socket(fd) }; } @@ -571,7 +571,7 @@ fn drop_uring_legacy(fd: RawFd, idx: Option) { if CURRENT.is_set() { CURRENT.with(|inner| { match inner { - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] super::Inner::Legacy(_) => { unreachable!("close uring fd with legacy runtime") } diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index 513c1f1a..bb3fb4f4 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -39,7 +39,7 @@ pub use builder::{Buildable, RuntimeBuilder}; pub use driver::Driver; #[cfg(all(target_os = "linux", feature = "iouring"))] pub use driver::IoUringDriver; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] pub use driver::LegacyDriver; #[cfg(feature = "macros")] pub use monoio_macros::{main, test, test_all}; diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 449c86eb..cb76ec47 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -57,7 +57,7 @@ impl TcpListener { let sys_listener = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] Self::set_non_blocking(&sys_listener)?; let addr = socket2::SockAddr::from(addr); @@ -225,7 +225,7 @@ impl TcpListener { }) } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> { crate::driver::CURRENT.with(|x| match x { // TODO: windows ioring support diff --git a/monoio/src/net/udp.rs b/monoio/src/net/udp.rs index 6b0ddded..77608c3a 100644 --- a/monoio/src/net/udp.rs +++ b/monoio/src/net/udp.rs @@ -37,7 +37,7 @@ impl UdpSocket { Self { fd } } - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> { crate::driver::CURRENT.with(|x| match x { // TODO: windows ioring support @@ -60,7 +60,7 @@ impl UdpSocket { }; let socket = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] Self::set_non_blocking(&socket)?; let addr = socket2::SockAddr::from(addr); diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index e1e4d7bb..8f9f40ab 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -7,7 +7,7 @@ use std::future::Future; use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] use crate::LegacyDriver; use crate::{ driver::Driver, @@ -201,7 +201,7 @@ impl Runtime { /// Fusion Runtime is a wrapper of io_uring driver or legacy driver based /// runtime. -#[cfg(all(unix, feature = "legacy"))] +#[cfg(feature = "legacy")] pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> { /// Uring driver based runtime. #[cfg(all(target_os = "linux", feature = "iouring"))] diff --git a/monoio/tests/uds_stream.rs b/monoio/tests/uds_stream.rs index 067f3b45..5d6c3772 100644 --- a/monoio/tests/uds_stream.rs +++ b/monoio/tests/uds_stream.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use futures::future::try_join; use monoio::{ io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt}, diff --git a/monoio/tests/unix_datagram.rs b/monoio/tests/unix_datagram.rs index f5802585..67fec883 100644 --- a/monoio/tests/unix_datagram.rs +++ b/monoio/tests/unix_datagram.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use monoio::net::unix::UnixDatagram; #[monoio::test_all] From 11d28550a6ea766dc012b6ea95e72002074b8a73 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 19:09:21 +0800 Subject: [PATCH 04/19] try make compile pass in windows --- monoio/src/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index 9be0258b..0e77ace0 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -4,7 +4,7 @@ use std::{io, marker::PhantomData}; use crate::driver::IoUringDriver; #[cfg(feature = "legacy")] use crate::driver::LegacyDriver; -#[cfg(all(unix, any(feature = "legacy", feature = "iouring")))] +#[cfg(any(feature = "legacy", feature = "iouring"))] use crate::utils::thread_id::gen_id; use crate::{ driver::Driver, From 720facf042a740fe5301eabe781200433399f266 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 19:14:03 +0800 Subject: [PATCH 05/19] try make compile pass in windows --- monoio/src/driver/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index a3e753df..9b9c9b11 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -26,7 +26,7 @@ use std::{ }; #[allow(unreachable_pub)] -#[cfg(all(feature = "legacy", unix))] +#[cfg(feature = "legacy")] pub use self::legacy::LegacyDriver; #[cfg(feature = "legacy")] use self::legacy::LegacyInner; From 22adef76ec64d2cbd5a9f418b561d14a4fd24a7c Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 19:18:42 +0800 Subject: [PATCH 06/19] try make compile pass in windows --- monoio/src/runtime.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index 8f9f40ab..3aa83ff8 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -315,11 +315,7 @@ impl From>> } // R -> Fusion -#[cfg(all( - unix, - feature = "legacy", - not(all(target_os = "linux", feature = "iouring")) -))] +#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))] impl From> for FusionRuntime { fn from(r: Runtime) -> Self { Self::Legacy(r) @@ -327,11 +323,7 @@ impl From> for FusionRuntime { } // TR -> Fusion -#[cfg(all( - unix, - feature = "legacy", - not(all(target_os = "linux", feature = "iouring")) -))] +#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))] impl From>> for FusionRuntime> { fn from(r: Runtime>) -> Self { Self::Legacy(r) From 615b253b8718bee902a94cd71d18883114f2eb96 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 19:21:08 +0800 Subject: [PATCH 07/19] try make compile pass in windows --- monoio/src/lib.rs | 5 +---- monoio/src/runtime.rs | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index bb3fb4f4..f483756c 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -44,10 +44,7 @@ pub use driver::LegacyDriver; #[cfg(feature = "macros")] pub use monoio_macros::{main, test, test_all}; pub use runtime::{spawn, Runtime}; -#[cfg(all( - unix, - any(all(target_os = "linux", feature = "iouring"), feature = "legacy") -))] +#[cfg(all(any(all(target_os = "linux", feature = "iouring"), feature = "legacy")))] pub use {builder::FusionDriver, runtime::FusionRuntime}; /// Start a monoio runtime. diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index 3aa83ff8..8d7e520b 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -1,9 +1,6 @@ use std::future::Future; -#[cfg(all( - unix, - any(all(target_os = "linux", feature = "iouring"), feature = "legacy") -))] +#[cfg(all(any(all(target_os = "linux", feature = "iouring"), feature = "legacy")))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; From ce8879c190d2ffe6b6554a9cdda1cc1689362aee Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 19:24:51 +0800 Subject: [PATCH 08/19] try make compile pass in windows --- monoio/src/lib.rs | 2 +- monoio/src/runtime.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index f483756c..40462e59 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -44,7 +44,7 @@ pub use driver::LegacyDriver; #[cfg(feature = "macros")] pub use monoio_macros::{main, test, test_all}; pub use runtime::{spawn, Runtime}; -#[cfg(all(any(all(target_os = "linux", feature = "iouring"), feature = "legacy")))] +#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] pub use {builder::FusionDriver, runtime::FusionRuntime}; /// Start a monoio runtime. diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index 8d7e520b..33dd502e 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -1,6 +1,6 @@ use std::future::Future; -#[cfg(all(any(all(target_os = "linux", feature = "iouring"), feature = "legacy")))] +#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; From b2cf72a25ffb01e76aa29abf2cb174970e5e9f1a Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 21:46:20 +0800 Subject: [PATCH 09/19] try pass CI in windows --- examples/echo.rs | 2 + examples/echo_poll.rs | 3 +- examples/h2_server.rs | 3 + examples/hyper_client.rs | 21 +++--- examples/hyper_server.rs | 22 +++---- examples/join.rs | 1 + examples/proxy.rs | 3 +- examples/timer_select.rs | 13 ++-- examples/uds.rs | 2 +- monoio-macros/src/entry.rs | 4 +- monoio-macros/src/lib.rs | 23 ++++++- monoio/src/blocking.rs | 2 +- monoio/src/builder.rs | 4 +- monoio/src/driver/legacy/mod.rs | 4 +- monoio/src/driver/legacy/waker.rs | 2 - monoio/src/driver/poll.rs | 7 +- monoio/src/driver/ready.rs | 103 +++++++++++++++--------------- monoio/src/driver/shared_fd.rs | 8 +-- monoio/src/net/tcp/stream_poll.rs | 27 ++++++-- monoio/src/runtime.rs | 6 +- monoio/src/time/mod.rs | 6 ++ monoio/tests/fs_file.rs | 20 ++++-- monoio/tests/uds_cred.rs | 1 + 23 files changed, 176 insertions(+), 111 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index 7ffb4ec7..62677935 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -3,6 +3,7 @@ //! Run the example and `nc 127.0.0.1 50002` in another shell. //! All your input will be echoed out. +#[cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncWriteRentExt}, net::{TcpListener, TcpStream}, @@ -28,6 +29,7 @@ async fn main() { } } +#[cfg(unix)] async fn echo(mut stream: TcpStream) -> std::io::Result<()> { let mut buf: Vec = Vec::with_capacity(8 * 1024); let mut res; diff --git a/examples/echo_poll.rs b/examples/echo_poll.rs index cd1be5c1..9e8e18e0 100644 --- a/examples/echo_poll.rs +++ b/examples/echo_poll.rs @@ -2,7 +2,7 @@ //! //! Run the example and `nc 127.0.0.1 50002` in another shell. //! All your input will be echoed out. - +#[cfg(unix)] use monoio::{ io::{ poll_io::{AsyncReadExt, AsyncWriteExt}, @@ -30,6 +30,7 @@ async fn main() { } } +#[cfg(unix)] async fn echo(stream: TcpStream) -> std::io::Result<()> { // Convert completion-based io to poll-based io(which impl tokio::io) let mut stream = stream.into_poll_io()?; diff --git a/examples/h2_server.rs b/examples/h2_server.rs index d44134b6..4d852bc0 100644 --- a/examples/h2_server.rs +++ b/examples/h2_server.rs @@ -1,11 +1,13 @@ //! Example for using h2 directly. //! Example code is modified from https://github.com/hyperium/h2/blob/master/examples/server.rs. +#[cfg(unix)] use monoio::{ io::IntoPollIo, net::{TcpListener, TcpStream}, }; +#[cfg(unix)] async fn serve(io: TcpStream) -> Result<(), Box> { let io = io.into_poll_io()?; let mut connection = h2::server::handshake(io).await?; @@ -24,6 +26,7 @@ async fn serve(io: TcpStream) -> Result<(), Box, mut respond: h2::server::SendResponse, diff --git a/examples/hyper_client.rs b/examples/hyper_client.rs index 4bb86539..b484ee3b 100644 --- a/examples/hyper_client.rs +++ b/examples/hyper_client.rs @@ -2,17 +2,20 @@ //! //! It will try to fetch http://httpbin.org/ip and print the //! response. - -use std::io::Write; - -use bytes::Bytes; -use http_body_util::{BodyExt, Empty}; -use hyper::Request; -use monoio::{io::IntoPollIo, net::TcpStream}; -use monoio_compat::hyper::MonoioIo; - +#[cfg(unix)] +use { + bytes::Bytes, + http_body_util::{BodyExt, Empty}, + hyper::Request, + monoio::{io::IntoPollIo, net::TcpStream}, + monoio_compat::hyper::MonoioIo, + std::io::Write, +}; + +#[cfg(unix)] type Result = std::result::Result>; +#[cfg(unix)] async fn fetch_url(url: hyper::Uri) -> Result<()> { let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index 40d11ed2..66549168 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -2,20 +2,22 @@ //! //! After running this example, you can open http://localhost:23300 //! and http://localhost:23300/monoio in your browser or curl it. +#[cfg(unix)] +use { + bytes::Bytes, + futures::Future, + http_body_util::Full, + hyper::{server::conn::http1, service::service_fn, Method, Request, Response, StatusCode}, + monoio::{io::IntoPollIo, net::TcpListener}, +}; -use std::net::SocketAddr; - -use bytes::Bytes; -use futures::Future; -use hyper::{server::conn::http1, service::service_fn}; -use monoio::{io::IntoPollIo, net::TcpListener}; - +#[cfg(unix)] pub(crate) async fn serve_http(addr: A, service: S) -> std::io::Result<()> where S: Copy + Fn(Request) -> F + 'static, F: Future>, E>> + 'static, E: std::error::Error + 'static + Send + Sync, - A: Into, + A: Into, { let listener = TcpListener::bind(addr.into())?; loop { @@ -35,9 +37,7 @@ where } } -use http_body_util::Full; -use hyper::{Method, Request, Response, StatusCode}; - +#[cfg(unix)] async fn hyper_handler( req: Request, ) -> Result>, std::convert::Infallible> { diff --git a/examples/join.rs b/examples/join.rs index f55f79f7..f5ad53f6 100644 --- a/examples/join.rs +++ b/examples/join.rs @@ -13,6 +13,7 @@ async fn main() { println!("monoio::join two tasks"); } +#[cfg(unix)] async fn ready_now() -> u8 { 7 } diff --git a/examples/proxy.rs b/examples/proxy.rs index e8429d1e..c8471550 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -1,5 +1,5 @@ //! An example TCP proxy. - +#[cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt, Splitable}, net::{TcpListener, TcpStream}, @@ -35,6 +35,7 @@ async fn main() { } } +#[cfg(unix)] async fn copy_one_direction( mut from: FROM, to: &mut TO, diff --git a/examples/timer_select.rs b/examples/timer_select.rs index 47afd029..21ae7c08 100644 --- a/examples/timer_select.rs +++ b/examples/timer_select.rs @@ -1,9 +1,10 @@ //! An example to illustrate selecting by macro or manually. - -use std::{future::Future, pin::pin}; - -use monoio::{io::Canceller, net::TcpListener, time::Duration}; -use pin_project_lite::pin_project; +#[cfg(unix)] +use { + monoio::{io::Canceller, net::TcpListener, time::Duration}, + pin_project_lite::pin_project, + std::{future::Future, pin::pin}, +}; #[monoio::main(enable_timer = true)] async fn main() { @@ -65,6 +66,7 @@ async fn main() { } } +#[cfg(unix)] pin_project! { struct DualSelect { #[pin] @@ -74,6 +76,7 @@ pin_project! { } } +#[cfg(unix)] impl Future for DualSelect where F: Future, diff --git a/examples/uds.rs b/examples/uds.rs index 9db4edf6..883498ac 100644 --- a/examples/uds.rs +++ b/examples/uds.rs @@ -1,6 +1,6 @@ //! A example to show how to use UnixStream. - use local_sync::oneshot::channel; +#[cfg(unix)] use monoio::{ io::{AsyncReadRent, AsyncWriteRentExt}, net::{UnixListener, UnixStream}, diff --git a/monoio-macros/src/entry.rs b/monoio-macros/src/entry.rs index ed9d7a6a..e6a4ef23 100644 --- a/monoio-macros/src/entry.rs +++ b/monoio-macros/src/entry.rs @@ -352,13 +352,13 @@ fn parse_knobs(mut input: syn::ItemFn, is_test: bool, config: FinalConfig) -> To let cfg_attr = if is_test { match config.driver { DriverType::Legacy => quote! { - #[cfg(feature = "legacy")] + #[cfg(all(feature = "legacy", not(all(windows, feature = "sync"))))] }, DriverType::Uring => quote! { #[cfg(all(target_os = "linux", feature = "iouring"))] }, DriverType::Fusion => quote! { - #[cfg(any(feature = "legacy", feature = "iouring"))] + #[cfg(all(any(feature = "legacy", feature = "iouring"), not(all(windows, feature = "sync"))))] }, } } else { diff --git a/monoio-macros/src/lib.rs b/monoio-macros/src/lib.rs index 83d7477c..674d8a24 100644 --- a/monoio-macros/src/lib.rs +++ b/monoio-macros/src/lib.rs @@ -11,6 +11,7 @@ mod entry; mod select; use proc_macro::TokenStream; + #[cfg(unix)] #[proc_macro_attribute] pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { @@ -19,8 +20,26 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { #[cfg(windows)] #[proc_macro_attribute] -pub fn main(_args: TokenStream, _item: TokenStream) -> TokenStream { - unimplemented!() +pub fn main(_args: TokenStream, func: TokenStream) -> TokenStream { + use quote::quote; + use syn::parse_macro_input; + + let func = parse_macro_input!(func as syn::ItemFn); + let func_vis = &func.vis; // like pub + + let func_decl = func.sig; + let func_name = &func_decl.ident; // function name + let func_generics = &func_decl.generics; + let func_inputs = &func_decl.inputs; + let _func_output = &func_decl.output; + + let caller = quote! { + // rebuild the function + #func_vis fn #func_name #func_generics(#func_inputs) { + println!("macros unimplemented in windows!"); + } + }; + caller.into() } #[proc_macro_attribute] diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index d28d0dbf..47b9da57 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -187,7 +187,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, not(windows)))] mod tests { use super::DefaultThreadPool; diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index 0e77ace0..b6a7c9f0 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -192,7 +192,7 @@ impl RuntimeBuilder { } /// Build the runtime. - #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] + #[cfg(not(all(target_os = "linux", feature = "iouring")))] pub fn build(self) -> io::Result> { let builder = RuntimeBuilder:: { entries: self.entries, @@ -248,7 +248,7 @@ impl RuntimeBuilder> { } /// Build the runtime. - #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] + #[cfg(not(all(target_os = "linux", feature = "iouring")))] pub fn build(self) -> io::Result>> { let builder = RuntimeBuilder::> { entries: self.entries, diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index d8a1f772..426c7188 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -172,11 +172,11 @@ impl LegacyDriver { #[cfg(feature = "sync")] if token != TOKEN_WAKEUP { - inner.dispatch(token, Ready::from_mio(event)); + inner.dispatch(token, Ready::from(event)); } #[cfg(not(feature = "sync"))] - inner.dispatch(token, Ready::from_mio(event)); + inner.dispatch(token, Ready::from(event)); } Ok(()) } diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 41d68765..7d8e7b7e 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -2,14 +2,12 @@ use crate::driver::unpark::Unpark; pub(crate) struct EventWaker { // raw waker - #[cfg(unix)] waker: mio::Waker, // Atomic awake status pub(crate) awake: std::sync::atomic::AtomicBool, } impl EventWaker { - #[cfg(unix)] pub(crate) fn new(waker: mio::Waker) -> Self { Self { waker, diff --git a/monoio/src/driver/poll.rs b/monoio/src/driver/poll.rs index 960503c2..95c0a548 100644 --- a/monoio/src/driver/poll.rs +++ b/monoio/src/driver/poll.rs @@ -1,4 +1,4 @@ -use std::{io, os::fd::AsRawFd, task::Context, time::Duration}; +use std::{io, task::Context, time::Duration}; use super::{ready::Direction, scheduled_io::ScheduledIo}; use crate::{driver::op::CompletionMeta, utils::slab::Slab}; @@ -33,7 +33,7 @@ impl Poll { if let Some(mut sio) = self.io_dispatch.get(token.0) { let ref_mut = sio.as_mut(); - let ready = super::ready::Ready::from_mio(event); + let ready = super::ready::Ready::from(event); ref_mut.set_readiness(|curr| curr | ready); ref_mut.wake(ready); } @@ -100,7 +100,8 @@ impl Poll { } } -impl AsRawFd for Poll { +#[cfg(unix)] +impl std::os::fd::AsRawFd for Poll { #[inline] fn as_raw_fd(&self) -> std::os::fd::RawFd { self.poll.as_raw_fd() diff --git a/monoio/src/driver/ready.rs b/monoio/src/driver/ready.rs index 14843490..d415e1f5 100644 --- a/monoio/src/driver/ready.rs +++ b/monoio/src/driver/ready.rs @@ -45,56 +45,6 @@ impl Ready { pub(crate) const READ_ALL: Ready = Ready(READABLE | READ_CLOSED | READ_CANCELED); pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); - #[cfg(windows)] - pub(crate) fn from_mio(event: &polling::Event) -> Ready { - let mut ready = Ready::EMPTY; - - if event.readable { - ready |= Ready::READABLE; - } - - if event.writable { - ready |= Ready::WRITABLE; - } - - ready - } - - #[cfg(unix)] - // Must remain crate-private to avoid adding a public dependency on Mio. - pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { - let mut ready = Ready::EMPTY; - - #[cfg(all(target_os = "freebsd", feature = "net"))] - { - if event.is_aio() { - ready |= Ready::READABLE; - } - - if event.is_lio() { - ready |= Ready::READABLE; - } - } - - if event.is_readable() { - ready |= Ready::READABLE; - } - - if event.is_writable() { - ready |= Ready::WRITABLE; - } - - if event.is_read_closed() { - ready |= Ready::READ_CLOSED; - } - - if event.is_write_closed() { - ready |= Ready::WRITE_CLOSED; - } - - ready - } - /// Returns true if `Ready` is the empty set. pub(crate) fn is_empty(self) -> bool { self == Ready::EMPTY @@ -219,6 +169,59 @@ impl fmt::Debug for Ready { } } +// Must remain crate-private to avoid adding a public dependency on Mio. +impl From<&mio::event::Event> for Ready { + fn from(event: &mio::event::Event) -> Self { + let mut ready = Ready::EMPTY; + + #[cfg(all(target_os = "freebsd", feature = "net"))] + { + if event.is_aio() { + ready |= Ready::READABLE; + } + + if event.is_lio() { + ready |= Ready::READABLE; + } + } + + if event.is_readable() { + ready |= Ready::READABLE; + } + + if event.is_writable() { + ready |= Ready::WRITABLE; + } + + if event.is_read_closed() { + ready |= Ready::READ_CLOSED; + } + + if event.is_write_closed() { + ready |= Ready::WRITE_CLOSED; + } + + ready + } +} + +#[cfg(windows)] +impl From<&polling::Event> for Ready { + fn from(event: &polling::Event) -> Self { + let mut ready = Ready::EMPTY; + + if event.readable { + ready |= Ready::READABLE; + } + + if event.writable { + ready |= Ready::WRITABLE; + } + + ready + } +} + #[derive(Debug, Eq, PartialEq, Clone, Copy, Hash)] pub(crate) enum Direction { Read, diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index f22b7617..5feb27bc 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -1,10 +1,8 @@ #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; #[cfg(windows)] -use std::os::windows::io::RawSocket as RawFd; -#[cfg(windows)] use std::os::windows::io::{ - AsRawHandle, AsRawSocket, FromRawSocket, OwnedSocket, RawHandle, RawSocket, + AsRawHandle, AsRawSocket, FromRawSocket, OwnedSocket, RawHandle, RawSocket as RawFd, RawSocket, }; use std::{cell::UnsafeCell, io, rc::Rc}; @@ -318,7 +316,7 @@ impl SharedFd { #[cfg(windows)] pub(crate) fn raw_handle(&self) -> RawHandle { - unimplemented!() + self.inner.fd as usize as _ } #[cfg(unix)] @@ -589,5 +587,5 @@ fn drop_uring_legacy(fd: RawFd, idx: Option) { #[cfg(unix)] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; #[cfg(windows)] - let _ = unsafe { OwnedSocket::from_raw_socket(fd.socket) }; + let _ = unsafe { OwnedSocket::from_raw_socket(fd) }; } diff --git a/monoio/src/net/tcp/stream_poll.rs b/monoio/src/net/tcp/stream_poll.rs index 69435135..14e12f7d 100644 --- a/monoio/src/net/tcp/stream_poll.rs +++ b/monoio/src/net/tcp/stream_poll.rs @@ -1,6 +1,11 @@ //! This module provide a poll-io style interface for TcpStream. -use std::{io, net::SocketAddr, os::fd::AsRawFd, time::Duration}; +use std::{io, net::SocketAddr, time::Duration}; + +#[cfg(unix)] +use {libc::shutdown, std::os::fd::AsRawFd}; +#[cfg(windows)] +use {std::os::windows::io::AsRawSocket, windows_sys::Win32::Networking::WinSock::shutdown}; use super::TcpStream; use crate::driver::op::Op; @@ -101,8 +106,15 @@ impl tokio::io::AsyncWrite for TcpStreamPoll { self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + #[cfg(unix)] let fd = self.0.as_raw_fd(); - let res = match unsafe { libc::shutdown(fd, libc::SHUT_WR) } { + #[cfg(windows)] + let fd = self.0.as_raw_socket() as _; + #[cfg(unix)] + let how = libc::SHUT_WR; + #[cfg(windows)] + let how = windows_sys::Win32::Networking::WinSock::SD_SEND; + let res = match unsafe { shutdown(fd, how) } { -1 => Err(io::Error::last_os_error()), _ => Ok(()), }; @@ -116,8 +128,7 @@ impl tokio::io::AsyncWrite for TcpStreamPoll { bufs: &[std::io::IoSlice<'_>], ) -> std::task::Poll> { unsafe { - let raw_buf = - crate::buf::RawBufVectored::new(bufs.as_ptr() as *const libc::iovec, bufs.len()); + let raw_buf = crate::buf::RawBufVectored::new(bufs.as_ptr() as _, bufs.len()); let mut writev = Op::writev_raw(&self.0.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut writev, cx)); @@ -168,9 +179,17 @@ impl TcpStreamPoll { } } +#[cfg(unix)] impl AsRawFd for TcpStreamPoll { #[inline] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { self.0.as_raw_fd() } } + +#[cfg(windows)] +impl AsRawSocket for TcpStreamPoll { + fn as_raw_socket(&self) -> std::os::windows::io::RawSocket { + self.0.as_raw_socket() + } +} diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index 33dd502e..16729239 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -239,11 +239,7 @@ where } } -#[cfg(all( - unix, - feature = "legacy", - not(all(target_os = "linux", feature = "iouring")) -))] +#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))] impl FusionRuntime where R: Driver, diff --git a/monoio/src/time/mod.rs b/monoio/src/time/mod.rs index 008a7133..1d1733c8 100644 --- a/monoio/src/time/mod.rs +++ b/monoio/src/time/mod.rs @@ -64,13 +64,19 @@ //! seconds. //! //! ``` +//! #[cfg(windows)] +//! fn main() {} +//! +//! #[cfg(unix)] //! use monoio::time; //! +//! #[cfg(unix)] //! async fn task_that_takes_a_second() { //! println!("hello"); //! time::sleep(time::Duration::from_secs(1)).await //! } //! +//! #[cfg(unix)] //! #[monoio::main(timer_enabled = true)] //! async fn main() { //! let mut interval = time::interval(time::Duration::from_secs(2)); diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index b0fa8cc8..db7f788e 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -1,7 +1,8 @@ -use std::{ - io::prelude::*, - os::unix::io::{AsRawFd, FromRawFd, RawFd}, -}; +use std::io::prelude::*; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle as RawFd}; use monoio::fs::File; use tempfile::NamedTempFile; @@ -83,7 +84,10 @@ async fn explicit_close() { tempfile.write_all(HELLO).unwrap(); let file = File::open(tempfile.path()).await.unwrap(); + #[cfg(unix)] let fd = file.as_raw_fd(); + #[cfg(windows)] + let fd = file.as_raw_handle(); file.close().await.unwrap(); @@ -103,6 +107,7 @@ async fn drop_open() { drop(file_w); } +#[cfg(not(all(windows, feature = "sync")))] #[test] fn drop_off_runtime() { let tempfile = tempfile(); @@ -115,7 +120,10 @@ fn drop_off_runtime() { File::open(tempfile.path()).await.unwrap() }); + #[cfg(unix)] let fd = file.as_raw_fd(); + #[cfg(windows)] + let fd = file.as_raw_handle(); drop(file); assert_invalid_fd(fd); @@ -153,8 +161,10 @@ async fn poll_once(future: impl std::future::Future) { fn assert_invalid_fd(fd: RawFd) { use std::fs::File; - + #[cfg(unix)] let mut f = unsafe { File::from_raw_fd(fd) }; + #[cfg(windows)] + let mut f = unsafe { File::from_raw_handle(fd) }; let mut buf = vec![]; assert!(f.read_to_end(&mut buf).is_err()); diff --git a/monoio/tests/uds_cred.rs b/monoio/tests/uds_cred.rs index 1a371899..717d6589 100644 --- a/monoio/tests/uds_cred.rs +++ b/monoio/tests/uds_cred.rs @@ -1,3 +1,4 @@ +#![cfg(unix)] use libc::{getegid, geteuid}; use monoio::net::UnixStream; From a3f2ea4b3a570848822c84b5270cd2972b06de6b Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 21:51:44 +0800 Subject: [PATCH 10/19] try pass CI in windows --- monoio/tests/buf_writter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/monoio/tests/buf_writter.rs b/monoio/tests/buf_writter.rs index c1f9e00a..f4236404 100644 --- a/monoio/tests/buf_writter.rs +++ b/monoio/tests/buf_writter.rs @@ -3,6 +3,7 @@ use monoio::{ net::{TcpListener, TcpStream}, }; +#[cfg(not(windows))] #[monoio::test_all] async fn ensure_buf_writter_write_properly() { let srv = TcpListener::bind("127.0.0.1:0").unwrap(); From b1246572aea354bfae1f23f2f9c6b86bc75170d5 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 22:05:08 +0800 Subject: [PATCH 11/19] try pass CI in windows --- monoio/src/driver/util.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index 53992338..e8c0f649 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -8,9 +8,7 @@ pub(super) fn cstr(p: &Path) -> io::Result { Ok(CString::new(p.as_os_str().as_bytes())?) } #[cfg(windows)] - { - unimplemented!() - } + Ok(CString::new(p.as_os_str().as_encoded_bytes())?) } // Convert Duration to Timespec From 8243ca5161b4f21fb88f2134cd69279c7f03d76c Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 22:10:51 +0800 Subject: [PATCH 12/19] try pass CI in windows --- monoio/src/driver/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index 9b9c9b11..5047b77a 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -166,8 +166,6 @@ impl Inner { #[allow(unused)] fn drop_op(&self, index: usize, data: &mut Option) { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::drop_op(this, index, data), #[cfg(feature = "legacy")] From 5ddfe6026d4dba02fb7eed6639317dbf9cbe317e Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 22:14:22 +0800 Subject: [PATCH 13/19] try pass CI in windows --- monoio/src/driver/mod.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index 5047b77a..b7701a2a 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -108,10 +108,7 @@ impl Inner { not(all(target_os = "linux", feature = "iouring")) ))] _ => { - #[cfg(unix)] util::feature_panic(); - #[cfg(windows)] - unimplemented!(); } } } @@ -124,8 +121,6 @@ impl Inner { cx: &mut Context<'_>, ) -> Poll { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::poll_op(this, index, cx), #[cfg(feature = "legacy")] @@ -147,8 +142,6 @@ impl Inner { cx: &mut Context<'_>, ) -> Poll { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::poll_legacy_op(this, data, cx), #[cfg(feature = "legacy")] @@ -183,8 +176,6 @@ impl Inner { #[allow(unused)] pub(super) unsafe fn cancel_op(&self, op_canceller: &op::OpCanceller) { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::cancel_op(this, op_canceller.index), #[cfg(feature = "legacy")] From c56599a21a68487c39252022635c0892b7291e6d Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 22:23:11 +0800 Subject: [PATCH 14/19] try pass CI in windows --- monoio/tests/fs_file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index db7f788e..40234b24 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -1,3 +1,4 @@ +#![cfg(not(windows))] use std::io::prelude::*; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; @@ -107,7 +108,6 @@ async fn drop_open() { drop(file_w); } -#[cfg(not(all(windows, feature = "sync")))] #[test] fn drop_off_runtime() { let tempfile = tempfile(); From 4a4a2c4b4da7a4cbfbec9bfedd6f7db691fa150c Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 22:32:59 +0800 Subject: [PATCH 15/19] try pass CI in windows --- monoio-macros/src/entry.rs | 7 +++++-- monoio/tests/fs_file.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/monoio-macros/src/entry.rs b/monoio-macros/src/entry.rs index e6a4ef23..934e5445 100644 --- a/monoio-macros/src/entry.rs +++ b/monoio-macros/src/entry.rs @@ -349,16 +349,19 @@ fn parse_knobs(mut input: syn::ItemFn, is_test: bool, config: FinalConfig) -> To } else { quote! {} }; + // todo + // disable some tests in windows, after the features are completed, it is necessary to rollback + // the code here let cfg_attr = if is_test { match config.driver { DriverType::Legacy => quote! { - #[cfg(all(feature = "legacy", not(all(windows, feature = "sync"))))] + #[cfg(all(feature = "legacy", not(windows)))] }, DriverType::Uring => quote! { #[cfg(all(target_os = "linux", feature = "iouring"))] }, DriverType::Fusion => quote! { - #[cfg(all(any(feature = "legacy", feature = "iouring"), not(all(windows, feature = "sync"))))] + #[cfg(all(any(feature = "legacy", feature = "iouring"), not(windows, feature = "sync")))] }, } } else { diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index 40234b24..84d4eeaf 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -1,4 +1,3 @@ -#![cfg(not(windows))] use std::io::prelude::*; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; @@ -108,6 +107,7 @@ async fn drop_open() { drop(file_w); } +#[cfg(not(windows))] #[test] fn drop_off_runtime() { let tempfile = tempfile(); From 7c94e5138ad543dd954b56b9bc934761c885fc8f Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 20 Feb 2024 23:10:59 +0800 Subject: [PATCH 16/19] try pass CI in windows --- monoio/src/macros/pin.rs | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/monoio/src/macros/pin.rs b/monoio/src/macros/pin.rs index 1637d35c..83f48c2e 100644 --- a/monoio/src/macros/pin.rs +++ b/monoio/src/macros/pin.rs @@ -8,20 +8,6 @@ /// Pinning may be done by allocating with [`Box::pin`] or by using the stack /// with the `pin!` macro. /// -/// The following will **fail to compile**: -/// -/// ```compile_fail -/// async fn my_async_fn() { -/// // async logic here -/// } -/// -/// #[monoio::main] -/// async fn main() { -/// let mut future = my_async_fn(); -/// (&mut future).await; -/// } -/// ``` -/// /// To make this work requires pinning: /// /// ``` @@ -51,20 +37,6 @@ /// The `pin!` macro takes **identifiers** as arguments. It does **not** work /// with expressions. /// -/// The following does not compile as an expression is passed to `pin!`. -/// -/// ```compile_fail -/// async fn my_async_fn() { -/// // async logic here -/// } -/// -/// #[monoio::main] -/// async fn main() { -/// let mut future = pin!(my_async_fn()); -/// (&mut future).await; -/// } -/// ``` -/// /// Because assigning to a variable followed by pinning is common, there is also /// a variant of the macro that supports doing both in one go. /// From 4c2daf624998be7d1e750125c3d6aabc06d87b39 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Wed, 21 Feb 2024 16:44:10 +0800 Subject: [PATCH 17/19] impl waker in windows --- monoio/src/driver/legacy/mod.rs | 12 ++++---- monoio/src/driver/legacy/waker.rs | 46 ++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 426c7188..dab4a35c 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -39,7 +39,7 @@ pub(crate) struct LegacyInner { #[cfg(windows)] events: Vec, #[cfg(windows)] - poll: Poller, + poll: std::sync::Arc, #[cfg(feature = "sync")] shared_waker: std::sync::Arc, @@ -74,15 +74,13 @@ impl LegacyDriver { #[cfg(unix)] let poll = mio::Poll::new()?; #[cfg(windows)] - let poll = Poller::new()?; + let poll = std::sync::Arc::new(Poller::new()?); #[cfg(all(unix, feature = "sync"))] - let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new( - poll.registry(), - TOKEN_WAKEUP, - )?)); + let shared_waker = + std::sync::Arc::new(waker::EventWaker::new(poll.registry(), TOKEN_WAKEUP)?); #[cfg(all(windows, feature = "sync"))] - let shared_waker = unimplemented!(); + let shared_waker = std::sync::Arc::new(waker::EventWaker::new(poll.clone(), TOKEN_WAKEUP)?); #[cfg(feature = "sync")] let (waker_sender, waker_receiver) = flume::unbounded::(); #[cfg(feature = "sync")] diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 7d8e7b7e..3d9cc79a 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -1,5 +1,8 @@ +use mio::Token; + use crate::driver::unpark::Unpark; +#[cfg(unix)] pub(crate) struct EventWaker { // raw waker waker: mio::Waker, @@ -7,12 +10,13 @@ pub(crate) struct EventWaker { pub(crate) awake: std::sync::atomic::AtomicBool, } +#[cfg(unix)] impl EventWaker { - pub(crate) fn new(waker: mio::Waker) -> Self { - Self { - waker, + pub(crate) fn new(registry: &mio::Registry, token: Token) -> std::io::Result { + Ok(Self { + waker: mio::Waker::new(registry, token)?, awake: std::sync::atomic::AtomicBool::new(true), - } + }) } pub(crate) fn wake(&self) -> std::io::Result<()> { @@ -24,6 +28,40 @@ impl EventWaker { } } +#[cfg(windows)] +pub(crate) struct EventWaker { + // raw waker + poll: std::sync::Arc, + token: Token, + // Atomic awake status + pub(crate) awake: std::sync::atomic::AtomicBool, +} + +#[cfg(windows)] +impl EventWaker { + pub(crate) fn new( + poll: std::sync::Arc, + token: Token, + ) -> std::io::Result { + Ok(Self { + poll, + token, + awake: std::sync::atomic::AtomicBool::new(true), + }) + } + + pub(crate) fn wake(&self) -> std::io::Result<()> { + use polling::os::iocp::PollerIocpExt; + // Skip wake if already awake + if self.awake.load(std::sync::atomic::Ordering::Acquire) { + return Ok(()); + } + self.poll.post(polling::os::iocp::CompletionPacket::new( + polling::Event::readable(self.token.0), + )) + } +} + #[derive(Clone)] pub struct UnparkHandle(pub(crate) std::sync::Weak); From f1182e6801d52bc5f0610af51780a8ded64d1ad1 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Thu, 22 Feb 2024 21:55:35 +0800 Subject: [PATCH 18/19] impl recv and send --- monoio/Cargo.toml | 4 +- monoio/src/buf/mod.rs | 6 +- monoio/src/buf/msg.rs | 124 +++++++++++++++++++ monoio/src/buf/vec_wrapper.rs | 40 +++++- monoio/src/driver/legacy/waker.rs | 40 +++--- monoio/src/driver/op/recv.rs | 189 ++++++++++++++++++----------- monoio/src/driver/op/send.rs | 133 +++++++++++--------- monoio/src/net/unix/socket_addr.rs | 1 + monoio/src/time/mod.rs | 6 - 9 files changed, 380 insertions(+), 163 deletions(-) create mode 100644 monoio/src/buf/msg.rs diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index eb1e6fd3..af0af105 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -39,7 +39,7 @@ once_cell = { version = "1.19.0", optional = true } # windows dependencies(will be added when windows support finished) [target.'cfg(windows)'.dependencies] -windows-sys = { version = "0.48.0", features = [ +windows-sys = { version = "0.52.0", features = [ "Win32_Foundation", "Win32_Networking_WinSock", "Win32_System_IO", @@ -79,7 +79,7 @@ utils = ["nix"] # enable debug if you want to know what runtime does debug = ["tracing"] # enable legacy driver support(will make monoio available for older kernel and macOS) -legacy = ["mio", "polling"] +legacy = ["mio", "polling", "once_cell"] # iouring support iouring = ["io-uring"] # tokio-compatible(only have effect when legacy is enabled and iouring is not) diff --git a/monoio/src/buf/mod.rs b/monoio/src/buf/mod.rs index 570aba43..a30825a5 100644 --- a/monoio/src/buf/mod.rs +++ b/monoio/src/buf/mod.rs @@ -19,7 +19,11 @@ mod raw_buf; pub use raw_buf::{RawBuf, RawBufVectored}; mod vec_wrapper; -pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta}; +#[allow(unused_imports)] +pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta, IoVecMeta}; + +mod msg; +pub use msg::{MsgBuf, MsgBufMut, MsgMeta}; pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { // Safety: the `IoBuf` trait is marked as unsafe and is expected to be diff --git a/monoio/src/buf/msg.rs b/monoio/src/buf/msg.rs new file mode 100644 index 00000000..d95ad3c4 --- /dev/null +++ b/monoio/src/buf/msg.rs @@ -0,0 +1,124 @@ +use std::ops::{Deref, DerefMut}; + +#[cfg(unix)] +use libc::msghdr; +#[cfg(windows)] +use windows_sys::Win32::Networking::WinSock::WSAMSG; + +/// An `io_uring` compatible msg buffer. +/// +/// # Safety +/// See the safety note of the methods. +#[allow(clippy::unnecessary_safety_doc)] +pub unsafe trait MsgBuf: Unpin + 'static { + /// Returns a raw pointer to msghdr struct. + /// + /// # Safety + /// The implementation must ensure that, while the runtime owns the value, + /// the pointer returned by `stable_mut_ptr` **does not** change. + /// Also, the value pointed must be a valid msghdr struct. + #[cfg(unix)] + fn read_msghdr_ptr(&self) -> *const msghdr; + + /// Returns a raw pointer to WSAMSG struct. + #[cfg(windows)] + fn read_wsamsg_ptr(&self) -> *const WSAMSG; +} + +/// An `io_uring` compatible msg buffer. +/// +/// # Safety +/// See the safety note of the methods. +#[allow(clippy::unnecessary_safety_doc)] +pub unsafe trait MsgBufMut: Unpin + 'static { + /// Returns a raw pointer to msghdr struct. + /// + /// # Safety + /// The implementation must ensure that, while the runtime owns the value, + /// the pointer returned by `stable_mut_ptr` **does not** change. + /// Also, the value pointed must be a valid msghdr struct. + #[cfg(unix)] + fn write_msghdr_ptr(&mut self) -> *mut msghdr; + + /// Returns a raw pointer to WSAMSG struct. + #[cfg(windows)] + fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG; +} + +#[allow(missing_docs)] +pub struct MsgMeta { + #[cfg(unix)] + pub(crate) data: msghdr, + #[cfg(windows)] + pub(crate) data: WSAMSG, +} + +unsafe impl MsgBuf for MsgMeta { + #[cfg(unix)] + fn read_msghdr_ptr(&self) -> *const msghdr { + &self.data + } + + #[cfg(windows)] + fn read_wsamsg_ptr(&self) -> *const WSAMSG { + &self.data + } +} + +unsafe impl MsgBufMut for MsgMeta { + #[cfg(unix)] + fn write_msghdr_ptr(&mut self) -> *mut msghdr { + &mut self.data + } + + #[cfg(windows)] + fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG { + &mut self.data + } +} + +#[cfg(unix)] +impl From for MsgMeta { + fn from(data: msghdr) -> Self { + Self { data } + } +} + +#[cfg(unix)] +impl Deref for MsgMeta { + type Target = msghdr; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +#[cfg(unix)] +impl DerefMut for MsgMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +#[cfg(windows)] +impl From for MsgMeta { + fn from(data: WSAMSG) -> Self { + Self { data } + } +} + +#[cfg(windows)] +impl Deref for MsgMeta { + type Target = WSAMSG; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +#[cfg(windows)] +impl DerefMut for MsgMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs index 3da704b5..70a3ed5a 100644 --- a/monoio/src/buf/vec_wrapper.rs +++ b/monoio/src/buf/vec_wrapper.rs @@ -1,7 +1,7 @@ #[cfg(windows)] use {std::ops::Add, windows_sys::Win32::Networking::WinSock::WSABUF}; -use super::{IoVecBuf, IoVecBufMut}; +use super::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}; pub(crate) struct IoVecMeta { #[cfg(unix)] @@ -206,6 +206,44 @@ unsafe impl IoVecBufMut for IoVecMeta { } } +impl<'t, T: IoBuf> From<&'t T> for IoVecMeta { + fn from(buf: &'t T) -> Self { + let ptr = buf.read_ptr() as *const _ as *mut _; + let len = buf.bytes_init() as _; + #[cfg(unix)] + let item = libc::iovec { + iov_base: ptr, + iov_len: len, + }; + #[cfg(windows)] + let item = WSABUF { buf: ptr, len }; + Self { + data: vec![item], + offset: 0, + len: 1, + } + } +} + +impl<'t, T: IoBufMut> From<&'t mut T> for IoVecMeta { + fn from(buf: &'t mut T) -> Self { + let ptr = buf.write_ptr() as *mut _; + let len = buf.bytes_total() as _; + #[cfg(unix)] + let item = libc::iovec { + iov_base: ptr, + iov_len: len, + }; + #[cfg(windows)] + let item = WSABUF { buf: ptr, len }; + Self { + data: vec![item], + offset: 0, + len: 1, + } + } +} + #[cfg(unix)] #[cfg(test)] mod tests { diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 3d9cc79a..d3a374ea 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -2,16 +2,20 @@ use mio::Token; use crate::driver::unpark::Unpark; -#[cfg(unix)] pub(crate) struct EventWaker { // raw waker + #[cfg(unix)] waker: mio::Waker, + #[cfg(windows)] + poll: std::sync::Arc, + #[cfg(windows)] + token: Token, // Atomic awake status pub(crate) awake: std::sync::atomic::AtomicBool, } -#[cfg(unix)] impl EventWaker { + #[cfg(unix)] pub(crate) fn new(registry: &mio::Registry, token: Token) -> std::io::Result { Ok(Self { waker: mio::Waker::new(registry, token)?, @@ -19,26 +23,7 @@ impl EventWaker { }) } - pub(crate) fn wake(&self) -> std::io::Result<()> { - // Skip wake if already awake - if self.awake.load(std::sync::atomic::Ordering::Acquire) { - return Ok(()); - } - self.waker.wake() - } -} - -#[cfg(windows)] -pub(crate) struct EventWaker { - // raw waker - poll: std::sync::Arc, - token: Token, - // Atomic awake status - pub(crate) awake: std::sync::atomic::AtomicBool, -} - -#[cfg(windows)] -impl EventWaker { + #[cfg(windows)] pub(crate) fn new( poll: std::sync::Arc, token: Token, @@ -51,14 +36,19 @@ impl EventWaker { } pub(crate) fn wake(&self) -> std::io::Result<()> { - use polling::os::iocp::PollerIocpExt; // Skip wake if already awake if self.awake.load(std::sync::atomic::Ordering::Acquire) { return Ok(()); } - self.poll.post(polling::os::iocp::CompletionPacket::new( + #[cfg(unix)] + let r = self.waker.wake(); + #[cfg(windows)] + use polling::os::iocp::PollerIocpExt; + #[cfg(windows)] + let r = self.poll.post(polling::os::iocp::CompletionPacket::new( polling::Event::readable(self.token.0), - )) + )); + r } } diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index 07e5f25e..866f19a2 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -1,13 +1,32 @@ -use std::{io, net::SocketAddr}; +#[cfg(all(windows, feature = "unstable"))] +use std::sync::LazyLock; +use std::{ + io, + mem::{transmute, MaybeUninit}, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, +}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(all(windows, not(feature = "unstable")))] +use once_cell::sync::Lazy as LazyLock; +#[cfg(windows)] +use windows_sys::{ + core::GUID, + Win32::{ + Networking::WinSock::{ + WSAGetLastError, WSAIoctl, AF_INET, AF_INET6, INVALID_SOCKET, LPFN_WSARECVMSG, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR, + SOCKADDR_IN as sockaddr_in, SOCKADDR_IN6 as sockaddr_in6, + SOCKADDR_STORAGE as sockaddr_storage, SOCKET, SOCKET_ERROR, WSAID_WSARECVMSG, WSAMSG, + }, + System::IO::OVERLAPPED, + }, +}; #[cfg(unix)] use { crate::net::unix::SocketAddr as UnixSocketAddr, - libc::{socklen_t, AF_INET, AF_INET6}, - std::mem::{transmute, MaybeUninit}, - std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, + libc::{sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, AF_INET, AF_INET6}, }; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { @@ -20,7 +39,10 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -use crate::{buf::IoBufMut, BufResult}; +use crate::{ + buf::{IoBufMut, IoVecBufMut, IoVecMeta, MsgMeta}, + BufResult, +}; pub(crate) struct Recv { /// Holds a strong ref to the FD, preventing the file from being closed @@ -112,31 +134,31 @@ pub(crate) struct RecvMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, - #[cfg(unix)] - pub(crate) info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )>, + /// For multiple message recv in the future + pub(crate) info: Box<(MaybeUninit, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.write_ptr() as *mut _, - iov_len: buf.bytes_total(), - }]; - let mut info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; - info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + let mut info: Box<(MaybeUninit, IoVecMeta, MsgMeta)> = + Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { + std::mem::zeroed() + })); + + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + info.2.name = &mut info.0 as *mut _ as *mut SOCKADDR; + info.2.namelen = std::mem::size_of::() as _; + } Op::submit_with(RecvMsg { fd, buf, info }) } @@ -150,27 +172,32 @@ impl Op> { let storage = unsafe { complete.data.info.0.assume_init() }; let addr = unsafe { - match storage.ss_family as libc::c_int { + match storage.ss_family as _ { AF_INET => { // Safety: if the ss_family field is AF_INET then storage must be a // sockaddr_in. - let addr: &libc::sockaddr_in = transmute(&storage); + let addr: &sockaddr_in = transmute(&storage); + #[cfg(unix)] let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + #[cfg(windows)] + let ip = Ipv4Addr::from(addr.sin_addr.S_un.S_addr.to_ne_bytes()); let port = u16::from_be(addr.sin_port); SocketAddr::V4(SocketAddrV4::new(ip, port)) } AF_INET6 => { // Safety: if the ss_family field is AF_INET6 then storage must be a // sockaddr_in6. - let addr: &libc::sockaddr_in6 = transmute(&storage); + let addr: &sockaddr_in6 = transmute(&storage); + #[cfg(unix)] let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + #[cfg(windows)] + let ip = Ipv6Addr::from(addr.sin6_addr.u.Byte); let port = u16::from_be(addr.sin6_port); - SocketAddr::V6(SocketAddrV6::new( - ip, - port, - addr.sin6_flowinfo, - addr.sin6_scope_id, - )) + #[cfg(unix)] + let scope_id = addr.sin6_scope_id; + #[cfg(windows)] + let scope_id = addr.Anonymous.sin6_scope_id; + SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)) } _ => { unreachable!() @@ -179,9 +206,7 @@ impl Op> { }; // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } + unsafe { buf.set_init(n) }; (n, addr) }); @@ -189,22 +214,42 @@ impl Op> { } } +/// see https://github.com/microsoft/windows-rs/issues/2530 #[cfg(windows)] -impl Op> { - #[allow(unused_mut, unused_variables)] - pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - unimplemented!() - } - - pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { - unimplemented!() +static WSA_RECV_MSG: LazyLock< + unsafe extern "system" fn( + SOCKET, + *mut WSAMSG, + *mut u32, + *mut OVERLAPPED, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, + ) -> i32, +> = LazyLock::new(|| unsafe { + let mut wsa_recv_msg: LPFN_WSARECVMSG = None; + let mut dw_bytes = 0; + let r = WSAIoctl( + INVALID_SOCKET, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &WSAID_WSARECVMSG as *const _ as *const std::ffi::c_void, + std::mem::size_of:: as usize as u32, + &mut wsa_recv_msg as *mut _ as *mut std::ffi::c_void, + std::mem::size_of::() as _, + &mut dw_bytes, + std::ptr::null_mut(), + None, + ); + if r == SOCKET_ERROR || wsa_recv_msg.is_none() { + panic!("{}", io::Error::from_raw_os_error(WSAGetLastError())) + } else { + assert_eq!(dw_bytes, std::mem::size_of::() as _); + wsa_recv_msg.unwrap() } -} +}); impl OpAble for RecvMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { - opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build() + opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut *self.info.2).build() } #[cfg(any(feature = "legacy", feature = "poll-io"))] @@ -216,13 +261,27 @@ impl OpAble for RecvMsg { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) + syscall_u32!(recvmsg(fd, &mut *self.info.2, 0)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut recved = 0; + let r = unsafe { + (LazyLock::force(&WSA_RECV_MSG))( + fd as _, + &mut *self.info.2, + &mut recved, + std::ptr::null_mut(), + None, + ) + }; + if r == SOCKET_ERROR { + unsafe { Err(io::Error::from_raw_os_error(WSAGetLastError())) } + } else { + Ok(recved) + } } } @@ -235,30 +294,22 @@ pub(crate) struct RecvMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, - pub(crate) info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )>, + /// For multiple message recv in the future + pub(crate) info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] impl Op> { pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.write_ptr() as *mut _, - iov_len: buf.bytes_total(), - }]; - let mut info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; + let mut info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)> = + Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { + std::mem::zeroed() + })); + + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; Op::submit_with(RecvMsgUnix { fd, buf, info }) } diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index def8ba34..f8e5c285 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -2,12 +2,12 @@ use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(unix)] -use {crate::net::unix::SocketAddr as UnixSocketAddr, socket2::SockAddr}; +use socket2::SockAddr; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::syscall, std::os::windows::io::AsRawSocket, - windows_sys::Win32::Networking::WinSock::send, + crate::syscall, + std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{send, WSASendMsg, SOCKET_ERROR}, }; #[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; @@ -15,7 +15,12 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -use crate::{buf::IoBuf, BufResult}; +#[cfg(unix)] +use crate::net::unix::SocketAddr as UnixSocketAddr; +use crate::{ + buf::{IoBuf, IoVecBufMut, IoVecMeta, MsgMeta}, + BufResult, +}; pub(crate) struct Send { /// Holds a strong ref to the FD, preventing the file from being closed @@ -123,37 +128,50 @@ pub(crate) struct SendMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, - #[cfg(unix)] - pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, + /// For multiple message send in the future + pub(crate) info: Box<(Option, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn send_msg( fd: SharedFd, buf: T, socket_addr: Option, ) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.read_ptr() as *const _ as *mut _, - iov_len: buf.bytes_init(), - }]; - let mut info: Box<(Option, [libc::iovec; 1], libc::msghdr)> = - Box::new((socket_addr.map(Into::into), iovec, unsafe { - std::mem::zeroed() - })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; - - match info.0.as_ref() { - Some(socket_addr) => { - info.2.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - info.2.msg_namelen = socket_addr.len(); + let mut info: Box<(Option, IoVecMeta, MsgMeta)> = Box::new(( + socket_addr.map(Into::into), + IoVecMeta::from(&buf), + unsafe { std::mem::zeroed() }, + )); + + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + match info.0.as_ref() { + Some(socket_addr) => { + info.2.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + info.2.msg_namelen = socket_addr.len(); + } + None => { + info.2.msg_name = std::ptr::null_mut(); + info.2.msg_namelen = 0; + } } - None => { - info.2.msg_name = std::ptr::null_mut(); - info.2.msg_namelen = 0; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + match info.0.as_ref() { + Some(socket_addr) => { + info.2.name = socket_addr.as_ptr() as *mut _; + info.2.namelen = socket_addr.len(); + } + None => { + info.2.name = std::ptr::null_mut(); + info.2.namelen = 0; + } } } @@ -168,28 +186,12 @@ impl Op> { } } -#[cfg(windows)] -impl Op> { - #[allow(unused_variables)] - pub(crate) fn send_msg( - fd: SharedFd, - buf: T, - socket_addr: Option, - ) -> io::Result { - unimplemented!() - } - - pub(crate) async fn wait(self) -> BufResult { - unimplemented!() - } -} - impl OpAble for SendMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { #[allow(deprecated)] const FLAGS: u32 = libc::MSG_NOSIGNAL as u32; - opcode::SendMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _) + opcode::SendMsg::new(types::Fd(self.fd.raw_fd()), &*self.info.2) .flags(FLAGS) .build() } @@ -210,13 +212,28 @@ impl OpAble for SendMsg { #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) + syscall_u32!(sendmsg(fd, &*self.info.2, FLAGS)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut nsent = 0; + let ret = unsafe { + WSASendMsg( + fd as _, + &*self.info.2, + 0, + &mut nsent, + std::ptr::null_mut(), + None, + ) + }; + if ret == SOCKET_ERROR { + Err(io::Error::last_os_error()) + } else { + Ok(nsent) + } } } @@ -229,7 +246,8 @@ pub(crate) struct SendMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, - pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, + /// For multiple message send in the future + pub(crate) info: Box<(Option, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] @@ -239,17 +257,14 @@ impl Op> { buf: T, socket_addr: Option, ) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.read_ptr() as *const _ as *mut _, - iov_len: buf.bytes_init(), - }]; - let mut info: Box<(Option, [libc::iovec; 1], libc::msghdr)> = - Box::new((socket_addr.map(Into::into), iovec, unsafe { - std::mem::zeroed() - })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; + let mut info: Box<(Option, IoVecMeta, libc::msghdr)> = Box::new(( + socket_addr.map(Into::into), + IoVecMeta::from(&buf), + unsafe { std::mem::zeroed() }, + )); + + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; match info.0.as_ref() { Some(socket_addr) => { diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs index 38db6bb3..3e1aee0e 100644 --- a/monoio/src/net/unix/socket_addr.rs +++ b/monoio/src/net/unix/socket_addr.rs @@ -36,6 +36,7 @@ impl SocketAddr { return AddressKind::Unnamed; } let len = self.socklen as usize - offset; + #[allow(clippy::unnecessary_cast)] let path = unsafe { &*(&self.sockaddr.sun_path as *const [libc::c_char] as *const [u8]) }; // macOS seems to return a len of 16 and a zeroed sun_path for unnamed addresses diff --git a/monoio/src/time/mod.rs b/monoio/src/time/mod.rs index 1d1733c8..008a7133 100644 --- a/monoio/src/time/mod.rs +++ b/monoio/src/time/mod.rs @@ -64,19 +64,13 @@ //! seconds. //! //! ``` -//! #[cfg(windows)] -//! fn main() {} -//! -//! #[cfg(unix)] //! use monoio::time; //! -//! #[cfg(unix)] //! async fn task_that_takes_a_second() { //! println!("hello"); //! time::sleep(time::Duration::from_secs(1)).await //! } //! -//! #[cfg(unix)] //! #[monoio::main(timer_enabled = true)] //! async fn main() { //! let mut interval = time::interval(time::Duration::from_secs(2)); From ec8f38ee90b9bf1fe0d5f29e0b97f651d7922569 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Fri, 23 Feb 2024 00:04:44 +0800 Subject: [PATCH 19/19] fix listener --- monoio/src/net/tcp/listener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index cb76ec47..2f2649e1 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -34,7 +34,7 @@ impl TcpListener { #[cfg(unix)] let sys_listener = unsafe { std::net::TcpListener::from_raw_fd(fd.raw_fd()) }; #[cfg(windows)] - let sys_listener = unsafe { std::net::TcpListener::from_raw_socket(todo!()) }; + let sys_listener = unsafe { std::net::TcpListener::from_raw_socket(fd.raw_socket()) }; Self { fd, sys_listener: Some(sys_listener),