Skip to content

Commit

Permalink
impl recv
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang committed Feb 22, 2024
1 parent 18d2e61 commit 0b98ba3
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 41 deletions.
2 changes: 1 addition & 1 deletion monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ utils = ["nix"]
# enable debug if you want to know what runtime does
debug = ["tracing"]
# enable legacy driver support(will make monoio available for older kernel and macOS)
legacy = ["mio", "polling"]
legacy = ["mio", "polling", "once_cell"]
# iouring support
iouring = ["io-uring"]
# tokio-compatible(only have effect when legacy is enabled and iouring is not)
Expand Down
146 changes: 106 additions & 40 deletions monoio/src/driver/op/recv.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
use std::{io, net::SocketAddr};
#[cfg(all(windows, feature = "unstable"))]
use std::sync::LazyLock;
use std::{
io,
mem::{transmute, MaybeUninit},
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
};

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(all(windows, not(feature = "unstable")))]
use once_cell::sync::Lazy as LazyLock;
#[cfg(windows)]
use windows_sys::{
core::GUID,
Win32::{
Networking::WinSock::{
WSAGetLastError, WSAIoctl, AF_INET, AF_INET6, INVALID_SOCKET, LPFN_WSARECVMSG,
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR,
SOCKADDR_IN as sockaddr_in, SOCKADDR_IN6 as sockaddr_in6,
SOCKADDR_STORAGE as sockaddr_storage, SOCKET, SOCKET_ERROR, WSAID_WSARECVMSG, WSAMSG,
},
System::IO::OVERLAPPED,
},
};
#[cfg(unix)]
use {
crate::buf::{IoVecBufMut, IoVecMeta, MsgMeta},
crate::net::unix::SocketAddr as UnixSocketAddr,
libc::{socklen_t, AF_INET, AF_INET6},
std::mem::{transmute, MaybeUninit},
std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
libc::{sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, AF_INET, AF_INET6},
};
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
use {
Expand All @@ -20,8 +38,11 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd};

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::driver::ready::Direction;
use crate::{buf::IoBufMut, BufResult};
use crate::{
buf::{IoBufMut, IoVecBufMut, IoVecMeta, MsgMeta},
driver::ready::Direction,
BufResult,
};

pub(crate) struct Recv<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
Expand Down Expand Up @@ -114,22 +135,30 @@ pub(crate) struct RecvMsg<T> {
/// Reference to the in-flight buffer.
pub(crate) buf: T,
/// For multiple message recv in the future
#[cfg(unix)]
pub(crate) info: Box<(MaybeUninit<libc::sockaddr_storage>, IoVecMeta, MsgMeta)>,
pub(crate) info: Box<(MaybeUninit<sockaddr_storage>, IoVecMeta, MsgMeta)>,
}

#[cfg(unix)]
impl<T: IoBufMut> Op<RecvMsg<T>> {
pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result<Self> {
let mut info: Box<(MaybeUninit<libc::sockaddr_storage>, IoVecMeta, MsgMeta)> =
let mut info: Box<(MaybeUninit<sockaddr_storage>, IoVecMeta, MsgMeta)> =
Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe {
std::mem::zeroed()
}));

info.2.msg_iov = info.1.write_iovec_ptr();
info.2.msg_iovlen = info.1.write_iovec_len() as _;
info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void;
info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as socklen_t;
#[cfg(unix)]
{
info.2.msg_iov = info.1.write_iovec_ptr();
info.2.msg_iovlen = info.1.write_iovec_len() as _;
info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void;
info.2.msg_namelen = std::mem::size_of::<sockaddr_storage>() as socklen_t;
}
#[cfg(windows)]
{
info.2.lpBuffers = info.1.write_wsabuf_ptr();
info.2.dwBufferCount = info.1.write_wsabuf_len() as _;
info.2.name = &mut info.0 as *mut _ as *mut SOCKADDR;
info.2.namelen = std::mem::size_of::<sockaddr_storage>() as _;
}

Op::submit_with(RecvMsg { fd, buf, info })
}
Expand All @@ -143,27 +172,32 @@ impl<T: IoBufMut> Op<RecvMsg<T>> {
let storage = unsafe { complete.data.info.0.assume_init() };

let addr = unsafe {
match storage.ss_family as libc::c_int {
match storage.ss_family as _ {
AF_INET => {
// Safety: if the ss_family field is AF_INET then storage must be a
// sockaddr_in.
let addr: &libc::sockaddr_in = transmute(&storage);
let addr: &sockaddr_in = transmute(&storage);
#[cfg(unix)]
let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
#[cfg(windows)]
let ip = Ipv4Addr::from(addr.sin_addr.S_un.S_addr.to_ne_bytes());
let port = u16::from_be(addr.sin_port);
SocketAddr::V4(SocketAddrV4::new(ip, port))
}
AF_INET6 => {
// Safety: if the ss_family field is AF_INET6 then storage must be a
// sockaddr_in6.
let addr: &libc::sockaddr_in6 = transmute(&storage);
let addr: &sockaddr_in6 = transmute(&storage);
#[cfg(unix)]
let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr);
#[cfg(windows)]
let ip = Ipv6Addr::from(addr.sin6_addr.u.Byte);
let port = u16::from_be(addr.sin6_port);
SocketAddr::V6(SocketAddrV6::new(
ip,
port,
addr.sin6_flowinfo,
addr.sin6_scope_id,
))
#[cfg(unix)]
let scope_id = addr.sin6_scope_id;
#[cfg(windows)]
let scope_id = addr.Anonymous.sin6_scope_id;
SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id))
}
_ => {
unreachable!()
Expand All @@ -180,17 +214,37 @@ impl<T: IoBufMut> Op<RecvMsg<T>> {
}
}

/// see https://github.com/microsoft/windows-rs/issues/2530
#[cfg(windows)]
impl<T: IoBufMut> Op<RecvMsg<T>> {
#[allow(unused_mut, unused_variables)]
pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result<Self> {
unimplemented!()
static WSA_RECV_MSG: LazyLock<
unsafe extern "system" fn(
SOCKET,
*mut WSAMSG,
*mut u32,
*mut OVERLAPPED,
LPWSAOVERLAPPED_COMPLETION_ROUTINE,
) -> i32,
> = LazyLock::new(|| unsafe {
let mut wsa_recv_msg: LPFN_WSARECVMSG = None;
let mut dw_bytes = 0;
let r = WSAIoctl(
INVALID_SOCKET,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&WSAID_WSARECVMSG as *const _ as *const std::ffi::c_void,
std::mem::size_of::<GUID> as usize as u32,
&mut wsa_recv_msg as *mut _ as *mut std::ffi::c_void,
std::mem::size_of::<LPFN_WSARECVMSG>() as _,
&mut dw_bytes,
std::ptr::null_mut(),
None,
);
if r == SOCKET_ERROR || wsa_recv_msg.is_none() {
panic!("{}", io::Error::from_raw_os_error(WSAGetLastError()))
} else {
assert_eq!(dw_bytes, std::mem::size_of::<LPFN_WSARECVMSG>() as _);
wsa_recv_msg.unwrap()
}

pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> {
unimplemented!()
}
}
});

impl<T: IoBufMut> OpAble for RecvMsg<T> {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Expand All @@ -212,10 +266,22 @@ impl<T: IoBufMut> OpAble for RecvMsg<T> {

#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
let _fd = self.fd.as_raw_socket();
// see https://github.com/microsoft/windows-rs/issues/2530
// need get LPFN_WSARECVMSG by WSAIoctl, then use the function pointer to invoke
unimplemented!();
let fd = self.fd.as_raw_socket();
let mut recved = 0;
let r = unsafe {
(LazyLock::force(&WSA_RECV_MSG))(
fd as _,
&mut *self.info.2,
&mut recved,
std::ptr::null_mut(),
None,
)
};
if r == SOCKET_ERROR {
unsafe { Err(io::Error::from_raw_os_error(WSAGetLastError())) }
} else {
Ok(recved)
}
}
}

Expand All @@ -229,21 +295,21 @@ pub(crate) struct RecvMsgUnix<T> {
/// Reference to the in-flight buffer.
pub(crate) buf: T,
/// For multiple message recv in the future
pub(crate) info: Box<(MaybeUninit<libc::sockaddr_storage>, IoVecMeta, libc::msghdr)>,
pub(crate) info: Box<(MaybeUninit<sockaddr_storage>, IoVecMeta, libc::msghdr)>,
}

#[cfg(unix)]
impl<T: IoBufMut> Op<RecvMsgUnix<T>> {
pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result<Self> {
let mut info: Box<(MaybeUninit<libc::sockaddr_storage>, IoVecMeta, libc::msghdr)> =
let mut info: Box<(MaybeUninit<sockaddr_storage>, IoVecMeta, libc::msghdr)> =
Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe {
std::mem::zeroed()
}));

info.2.msg_iov = info.1.write_iovec_ptr();
info.2.msg_iovlen = info.1.write_iovec_len() as _;
info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void;
info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as socklen_t;
info.2.msg_namelen = std::mem::size_of::<sockaddr_storage>() as socklen_t;

Op::submit_with(RecvMsgUnix { fd, buf, info })
}
Expand Down

0 comments on commit 0b98ba3

Please sign in to comment.