From 52ff11ba509a5d1c92af9af0392019d340c33272 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Thu, 22 Feb 2024 21:51:25 +0800 Subject: [PATCH] impl recv --- monoio/Cargo.toml | 2 +- monoio/src/driver/op/recv.rs | 143 +++++++++++++++++++++++++---------- 2 files changed, 104 insertions(+), 41 deletions(-) diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 3bd90672..af0af105 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -79,7 +79,7 @@ utils = ["nix"] # enable debug if you want to know what runtime does debug = ["tracing"] # enable legacy driver support(will make monoio available for older kernel and macOS) -legacy = ["mio", "polling"] +legacy = ["mio", "polling", "once_cell"] # iouring support iouring = ["io-uring"] # tokio-compatible(only have effect when legacy is enabled and iouring is not) diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index 2199fd7f..c93833b9 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -1,14 +1,32 @@ -use std::{io, net::SocketAddr}; +#[cfg(all(windows, feature = "unstable"))] +use std::sync::LazyLock; +use std::{ + io, + mem::{transmute, MaybeUninit}, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, +}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(all(windows, not(feature = "unstable")))] +use once_cell::sync::Lazy as LazyLock; +#[cfg(windows)] +use windows_sys::{ + core::GUID, + Win32::{ + Networking::WinSock::{ + WSAGetLastError, WSAIoctl, AF_INET, AF_INET6, INVALID_SOCKET, LPFN_WSARECVMSG, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR, + SOCKADDR_IN as sockaddr_in, SOCKADDR_IN6 as sockaddr_in6, + SOCKADDR_STORAGE as sockaddr_storage, SOCKET, SOCKET_ERROR, WSAID_WSARECVMSG, WSAMSG, + }, + System::IO::OVERLAPPED, + }, +}; #[cfg(unix)] use { - crate::buf::{IoVecBufMut, IoVecMeta, MsgMeta}, crate::net::unix::SocketAddr as UnixSocketAddr, - libc::{socklen_t, AF_INET, AF_INET6}, - std::mem::{transmute, MaybeUninit}, - std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, + libc::{sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, AF_INET, AF_INET6}, }; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { @@ -19,9 +37,9 @@ use { use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; +use crate::buf::{IoBufMut, IoVecBufMut, IoVecMeta, MsgMeta}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; -use crate::{buf::IoBufMut, BufResult}; +use crate::{driver::ready::Direction, BufResult}; pub(crate) struct Recv { /// Holds a strong ref to the FD, preventing the file from being closed @@ -114,22 +132,30 @@ pub(crate) struct RecvMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, /// For multiple message recv in the future - #[cfg(unix)] - pub(crate) info: Box<(MaybeUninit, IoVecMeta, MsgMeta)>, + pub(crate) info: Box<(MaybeUninit, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - let mut info: Box<(MaybeUninit, IoVecMeta, MsgMeta)> = + let mut info: Box<(MaybeUninit, IoVecMeta, MsgMeta)> = Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { std::mem::zeroed() })); - info.2.msg_iov = info.1.write_iovec_ptr(); - info.2.msg_iovlen = info.1.write_iovec_len() as _; - info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + info.2.name = &mut info.0 as *mut _ as *mut SOCKADDR; + info.2.namelen = std::mem::size_of::() as _; + } Op::submit_with(RecvMsg { fd, buf, info }) } @@ -143,27 +169,32 @@ impl Op> { let storage = unsafe { complete.data.info.0.assume_init() }; let addr = unsafe { - match storage.ss_family as libc::c_int { + match storage.ss_family as _ { AF_INET => { // Safety: if the ss_family field is AF_INET then storage must be a // sockaddr_in. - let addr: &libc::sockaddr_in = transmute(&storage); + let addr: &sockaddr_in = transmute(&storage); + #[cfg(unix)] let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + #[cfg(windows)] + let ip = Ipv4Addr::from(addr.sin_addr.S_un.S_addr.to_ne_bytes()); let port = u16::from_be(addr.sin_port); SocketAddr::V4(SocketAddrV4::new(ip, port)) } AF_INET6 => { // Safety: if the ss_family field is AF_INET6 then storage must be a // sockaddr_in6. - let addr: &libc::sockaddr_in6 = transmute(&storage); + let addr: &sockaddr_in6 = transmute(&storage); + #[cfg(unix)] let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + #[cfg(windows)] + let ip = Ipv6Addr::from(addr.sin6_addr.u.Byte); let port = u16::from_be(addr.sin6_port); - SocketAddr::V6(SocketAddrV6::new( - ip, - port, - addr.sin6_flowinfo, - addr.sin6_scope_id, - )) + #[cfg(unix)] + let scope_id = addr.sin6_scope_id; + #[cfg(windows)] + let scope_id = addr.Anonymous.sin6_scope_id; + SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)) } _ => { unreachable!() @@ -180,17 +211,37 @@ impl Op> { } } +/// see https://github.com/microsoft/windows-rs/issues/2530 #[cfg(windows)] -impl Op> { - #[allow(unused_mut, unused_variables)] - pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - unimplemented!() +static WSA_RECV_MSG: LazyLock< + unsafe extern "system" fn( + SOCKET, + *mut WSAMSG, + *mut u32, + *mut OVERLAPPED, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, + ) -> i32, +> = LazyLock::new(|| unsafe { + let mut wsa_recv_msg: LPFN_WSARECVMSG = None; + let mut dw_bytes = 0; + let r = WSAIoctl( + INVALID_SOCKET, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &WSAID_WSARECVMSG as *const _ as *const std::ffi::c_void, + std::mem::size_of:: as usize as u32, + &mut wsa_recv_msg as *mut _ as *mut std::ffi::c_void, + std::mem::size_of::() as _, + &mut dw_bytes, + std::ptr::null_mut(), + None, + ); + if r == SOCKET_ERROR || wsa_recv_msg.is_none() { + panic!("{}", io::Error::from_raw_os_error(WSAGetLastError())) + } else { + assert_eq!(dw_bytes, std::mem::size_of::() as _); + wsa_recv_msg.unwrap() } - - pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { - unimplemented!() - } -} +}); impl OpAble for RecvMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -212,10 +263,22 @@ impl OpAble for RecvMsg { #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - // see https://github.com/microsoft/windows-rs/issues/2530 - // need get LPFN_WSARECVMSG by WSAIoctl, then use the function pointer to invoke - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut recved = 0; + let r = unsafe { + (LazyLock::force(&WSA_RECV_MSG))( + fd as _, + &mut *self.info.2, + &mut recved, + std::ptr::null_mut(), + None, + ) + }; + if r == SOCKET_ERROR { + unsafe { Err(io::Error::from_raw_os_error(WSAGetLastError())) } + } else { + Ok(recved) + } } } @@ -229,13 +292,13 @@ pub(crate) struct RecvMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, /// For multiple message recv in the future - pub(crate) info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)>, + pub(crate) info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] impl Op> { pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result { - let mut info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)> = + let mut info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)> = Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { std::mem::zeroed() })); @@ -243,7 +306,7 @@ impl Op> { info.2.msg_iov = info.1.write_iovec_ptr(); info.2.msg_iovlen = info.1.write_iovec_len() as _; info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; Op::submit_with(RecvMsgUnix { fd, buf, info }) }