Skip to content

Commit

Permalink
aio: add recvmsg / sendmsg
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jun 26, 2024
1 parent b38fe3d commit 42069b3
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 94 deletions.
38 changes: 38 additions & 0 deletions src/aio/IoUring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub inline fn isSupported(op_types: []const type) bool {
.connect => std.os.linux.IORING_OP.CONNECT, // 5.5
.recv => std.os.linux.IORING_OP.RECV, // 5.6
.send => std.os.linux.IORING_OP.SEND, // 5.6
.recv_msg => std.os.linux.IORING_OP.RECVMSG, // 5.3
.send_msg => std.os.linux.IORING_OP.SENDMSG, // 5.3
.open_at => std.os.linux.IORING_OP.OPENAT, // 5.15
.close_file, .close_dir, .close_socket, .close_event_source => std.os.linux.IORING_OP.CLOSE, // 5.15
.timeout => std.os.linux.IORING_OP.TIMEOUT, // 5.4
Expand Down Expand Up @@ -236,6 +238,8 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai
.connect => try io.connect(user_data, op.socket, op.addr, op.addrlen),
.recv => try io.recv(user_data, op.socket, .{ .buffer = op.buffer }, 0),
.send => try io.send(user_data, op.socket, op.buffer, 0),
.recv_msg => try io.recvmsg(user_data, op.socket, op.out_msg, 0),
.send_msg => try io.sendmsg(user_data, op.socket, op.msg, 0),
.open_at => try io.openat(user_data, op.dir.fd, op.path, posix.convertOpenFlags(op.flags), 0),
.close_file => try io.close(user_data, op.file.handle),
.close_dir => try io.close(user_data, op.dir.fd),
Expand Down Expand Up @@ -439,6 +443,39 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
.NETDOWN => error.NetworkSubsystemFailed,
else => std.posix.unexpectedErrno(err),
},
.recv_msg => switch (err) {
.SUCCESS, .INTR, .INVAL, .FAULT, .AGAIN => unreachable,
.CANCELED => error.Canceled,
.BADF => unreachable, // always a race condition
.NOTCONN => error.SocketNotConnected,
.NOTSOCK => unreachable,
.NOMEM => error.SystemResources,
.CONNREFUSED => error.ConnectionRefused,
.CONNRESET => error.ConnectionResetByPeer,
.TIMEDOUT => error.ConnectionTimedOut,
else => std.posix.unexpectedErrno(err),
},
.send_msg => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.CANCELED => error.Canceled,
.ACCES => error.AccessDenied,
.ALREADY => error.FastOpenAlreadyInProgress,
.BADF => unreachable, // always a race condition
.CONNRESET => error.ConnectionResetByPeer,
.DESTADDRREQ => unreachable, // The socket is not connection-mode, and no peer address is set.
.FAULT => unreachable, // An invalid user space address was specified for an argument.
.ISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified
.MSGSIZE => error.MessageTooBig,
.NOBUFS => error.SystemResources,
.NOMEM => error.SystemResources,
.NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
.OPNOTSUPP => unreachable, // Some bit in the flags argument is inappropriate for the socket type.
.PIPE => error.BrokenPipe,
.HOSTUNREACH => error.NetworkUnreachable,
.NETUNREACH => error.NetworkUnreachable,
.NETDOWN => error.NetworkSubsystemFailed,
else => std.posix.unexpectedErrno(err),
},
.open_at => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.CANCELED => error.Canceled,
Expand Down Expand Up @@ -618,6 +655,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
.send => if (op.out_written) |w| {
w.* = @intCast(cqe.res);
},
.recv_msg, .send_msg => {},
.open_at => op.out_file.handle = cqe.res,
.close_file, .close_dir, .close_socket => {},
.notify_event_source, .wait_event_source, .close_event_source => {},
Expand Down
207 changes: 114 additions & 93 deletions src/aio/common/posix.zig
Original file line number Diff line number Diff line change
Expand Up @@ -157,104 +157,125 @@ fn write(op: anytype) !void {
}

pub inline fn perform(op: anytype, readiness: Readiness) Operation.Error!void {
switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) {
.fsync => _ = try std.posix.fsync(op.file.handle),
.read => read(op) catch |err| switch (err) {
error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForReading else e,
else => |e| return e,
},
.write => write(op) catch |err| switch (err) {
error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForWriting else e,
else => |e| return e,
},
.accept => op.out_socket.* = try std.posix.accept(op.socket, op.addr, op.inout_addrlen, 0),
.connect => _ = try std.posix.connect(op.socket, op.addr, op.addrlen),
.recv => op.out_read.* = try std.posix.recv(op.socket, op.buffer, 0),
.send => {
const written = try std.posix.send(op.socket, op.buffer, 0);
if (op.out_written) |w| w.* = written;
},
.open_at => if (builtin.target.os.tag == .windows) {
op.out_file.* = try op.dir.openFileZ(op.path, op.flags);
} else {
op.out_file.handle = try std.posix.openatZ(op.dir.fd, op.path, convertOpenFlags(op.flags), 0);
},
.close_file => std.posix.close(op.file.handle),
.close_dir => std.posix.close(op.dir.fd),
.rename_at => {
if (@hasDecl(std.posix.system, "renameat2")) {
const res = std.posix.system.renameat2(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE);
const e = std.posix.errno(res);
while (true) { // for handling INTR
switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) {
.fsync => _ = try std.posix.fsync(op.file.handle),
.read => read(op) catch |err| switch (err) {
error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForReading else e,
else => |e| return e,
},
.write => write(op) catch |err| switch (err) {
error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForWriting else e,
else => |e| return e,
},
.accept => op.out_socket.* = try std.posix.accept(op.socket, op.addr, op.inout_addrlen, 0),
.connect => _ = try std.posix.connect(op.socket, op.addr, op.addrlen),
.recv => op.out_read.* = try std.posix.recv(op.socket, op.buffer, 0),
.send => {
const written = try std.posix.send(op.socket, op.buffer, 0);
if (op.out_written) |w| w.* = written;
},
.recv_msg => {
const recvmsg = if (@hasDecl(std.posix.system, "recvmsg"))
std.posix.system.recvmsg
else
std.c.recvmsg;
const e = std.posix.errno(recvmsg(op.socket, op.out_msg, 0));
if (e != .SUCCESS) return switch (e) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.CANCELED => error.Canceled,
.ACCES => error.AccessDenied,
.PERM => error.AccessDenied,
.BUSY => error.FileBusy,
.DQUOT => error.DiskQuota,
.FAULT => unreachable,
.ISDIR => error.IsDir,
.LOOP => error.SymLinkLoop,
.MLINK => error.LinkQuotaExceeded,
.NAMETOOLONG => error.NameTooLong,
.NOENT => error.FileNotFound,
.NOTDIR => error.NotDir,
.SUCCESS, .INVAL, .BADF, .NOTSOCK => unreachable,
.INTR, .AGAIN => continue,
.CONNREFUSED => error.ConnectionRefused,
.FAULT => error.Unexpected,
.NOMEM => error.SystemResources,
.NOSPC => error.NoSpaceLeft,
.EXIST => error.PathAlreadyExists,
.NOTEMPTY => error.PathAlreadyExists,
.ROFS => error.ReadOnlyFileSystem,
.XDEV => error.RenameAcrossMountPoints,
.NOTCONN => error.SocketNotConnected,
else => std.posix.unexpectedErrno(e),
};
},
.send_msg => _ = try std.posix.sendmsg(op.socket, op.msg, 0),
.open_at => if (builtin.target.os.tag == .windows) {
op.out_file.* = try op.dir.openFileZ(op.path, op.flags);
} else {
// this is racy :(
if (builtin.target.os.tag == .windows) {
// access is weird on windows
if (op.new_dir.openFileZ(op.new_path, .{ .mode = .read_write })) |f| {
f.close();
return error.PathAlreadyExists;
} else |err| switch (err) {
error.FileNotFound => {}, // ok
else => |e| return e,
}
op.out_file.handle = try std.posix.openatZ(op.dir.fd, op.path, convertOpenFlags(op.flags), 0);
},
.close_file => std.posix.close(op.file.handle),
.close_dir => std.posix.close(op.dir.fd),
.rename_at => {
if (@hasDecl(std.posix.system, "renameat2")) {
const res = std.posix.system.renameat2(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE);
const e = std.posix.errno(res);
if (e != .SUCCESS) return switch (e) {
.SUCCESS, .INVAL => unreachable,
.INTR, .AGAIN => continue,
.CANCELED => error.Canceled,
.ACCES => error.AccessDenied,
.PERM => error.AccessDenied,
.BUSY => error.FileBusy,
.DQUOT => error.DiskQuota,
.FAULT => unreachable,
.ISDIR => error.IsDir,
.LOOP => error.SymLinkLoop,
.MLINK => error.LinkQuotaExceeded,
.NAMETOOLONG => error.NameTooLong,
.NOENT => error.FileNotFound,
.NOTDIR => error.NotDir,
.NOMEM => error.SystemResources,
.NOSPC => error.NoSpaceLeft,
.EXIST => error.PathAlreadyExists,
.NOTEMPTY => error.PathAlreadyExists,
.ROFS => error.ReadOnlyFileSystem,
.XDEV => error.RenameAcrossMountPoints,
else => std.posix.unexpectedErrno(e),
};
} else {
if (op.new_dir.accessZ(op.new_path, .{ .mode = .read_write })) {
return error.PathAlreadyExists;
} else |err| switch (err) {
error.FileNotFound => {}, // ok
else => |e| return e,
// this is racy :(
if (builtin.target.os.tag == .windows) {
// access is weird on windows
if (op.new_dir.openFileZ(op.new_path, .{ .mode = .read_write })) |f| {
f.close();
return error.PathAlreadyExists;
} else |err| switch (err) {
error.FileNotFound => {}, // ok
else => |e| return e,
}
} else {
if (op.new_dir.accessZ(op.new_path, .{ .mode = .read_write })) {
return error.PathAlreadyExists;
} else |err| switch (err) {
error.FileNotFound => {}, // ok
else => |e| return e,
}
}
try std.posix.renameatZ(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path);
}
try std.posix.renameatZ(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path);
}
},
.unlink_at => try std.posix.unlinkatZ(op.dir.fd, op.path, 0),
.mkdir_at => try std.posix.mkdiratZ(op.dir.fd, op.path, op.mode),
.symlink_at => if (builtin.target.os.tag == .windows) {
try op.dir.symLinkZ(op.target, op.link_path, .{});
} else {
try std.posix.symlinkatZ(op.target, op.dir.fd, op.link_path);
},
.child_exit => {
if (builtin.target.os.tag == .windows) {
@panic("fixme");
} else if (comptime @hasDecl(std.posix.system, "waitid")) {
var siginfo: std.posix.siginfo_t = undefined;
_ = std.posix.system.waitid(.PIDFD, readiness.fd, &siginfo, std.posix.W.EXITED | std.posix.W.NOHANG);
if (op.out_term) |term| term.* = statusToTerm(@intCast(siginfo.fields.common.second.sigchld.status));
},
.unlink_at => try std.posix.unlinkatZ(op.dir.fd, op.path, 0),
.mkdir_at => try std.posix.mkdiratZ(op.dir.fd, op.path, op.mode),
.symlink_at => if (builtin.target.os.tag == .windows) {
try op.dir.symLinkZ(op.target, op.link_path, .{});
} else {
const res = std.posix.waitpid(op.child, std.posix.W.NOHANG);
if (op.out_term) |term| term.* = statusToTerm(res.status);
}
},
.socket => op.out_socket.* = try std.posix.socket(op.domain, op.flags, op.protocol),
.close_socket => std.posix.close(op.socket),
.notify_event_source => op.source.notify(),
.wait_event_source => op.source.wait(),
.close_event_source => op.source.deinit(),
// this function is meant for execution on a thread, it makes no sense to execute these on a thread
.nop, .timeout, .link_timeout, .cancel => unreachable,
try std.posix.symlinkatZ(op.target, op.dir.fd, op.link_path);
},
.child_exit => {
if (builtin.target.os.tag == .windows) {
@panic("fixme");
} else if (comptime @hasDecl(std.posix.system, "waitid")) {
var siginfo: std.posix.siginfo_t = undefined;
_ = std.posix.system.waitid(.PIDFD, readiness.fd, &siginfo, std.posix.W.EXITED | std.posix.W.NOHANG);
if (op.out_term) |term| term.* = statusToTerm(@intCast(siginfo.fields.common.second.sigchld.status));
} else {
const res = std.posix.waitpid(op.child, std.posix.W.NOHANG);
if (op.out_term) |term| term.* = statusToTerm(res.status);
}
},
.socket => op.out_socket.* = try std.posix.socket(op.domain, op.flags, op.protocol),
.close_socket => std.posix.close(op.socket),
.notify_event_source => op.source.notify(),
.wait_event_source => op.source.wait(),
.close_event_source => op.source.deinit(),
// this function is meant for execution on a thread, it makes no sense to execute these on a thread
.nop, .timeout, .link_timeout, .cancel => unreachable,
}
break;
}
}

Expand Down Expand Up @@ -287,9 +308,9 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness {
}
break :blk .{ .fd = op.file.handle, .mode = .in };
},
.accept, .recv => .{ .fd = op.socket, .mode = .in },
.accept, .recv, .recv_msg => .{ .fd = op.socket, .mode = .in },
.socket, .connect => .{},
.send => .{ .fd = op.socket, .mode = .out },
.send, .send_msg => .{ .fd = op.socket, .mode = .out },
.open_at, .close_file, .close_dir, .close_socket => .{},
.timeout, .link_timeout => blk: {
if (builtin.target.os.tag == .windows) {
Expand Down Expand Up @@ -397,7 +418,7 @@ pub inline fn armReadiness(op: anytype, readiness: Readiness) ArmReadinessError!
},
.nop => {},
.fsync, .read, .write => {},
.socket, .accept, .connect, .recv, .send => {},
.socket, .accept, .connect, .recv, .send, .recv_msg, .send_msg => {},
.open_at, .close_file, .close_dir, .close_socket => {},
.cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => {},
.notify_event_source, .wait_event_source, .close_event_source => {},
Expand All @@ -410,7 +431,7 @@ pub inline fn closeReadiness(op: anytype, readiness: Readiness) void {
.timeout, .link_timeout, .child_exit => true,
.nop => false,
.fsync, .read, .write => false,
.socket, .accept, .connect, .recv, .send => false,
.socket, .accept, .connect, .recv, .send, .recv_msg, .send_msg => false,
.open_at, .close_file, .close_dir, .close_socket => false,
.cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => false,
.notify_event_source, .wait_event_source, .close_event_source => false,
Expand Down
32 changes: 31 additions & 1 deletion src/aio/ops.zig
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,33 @@ pub const Send = struct {
userdata: usize = 0,
};

// TODO: recvmsg, sendmsg
/// recvmsg(2)
pub const RecvMsg = struct {
pub const Error = error{
ConnectionRefused,
ConnectionTimedOut,
ConnectionResetByPeer,
SystemResources,
SocketNotConnected,
} || SharedError;
socket: std.posix.socket_t,
out_msg: *std.posix.msghdr,
out_id: ?*Id = null,
out_error: ?*Error = null,
link: Link = .unlinked,
userdata: usize = 0,
};

/// std.posix.sendmsg
pub const SendMsg = struct {
pub const Error = std.posix.SendMsgError || SharedError;
socket: std.posix.socket_t,
msg: *const std.posix.msghdr_const,
out_id: ?*Id = null,
out_error: ?*Error = null,
link: Link = .unlinked,
userdata: usize = 0,
};

/// std.fs.Dir.openFile
pub const OpenAt = struct {
Expand Down Expand Up @@ -312,6 +338,8 @@ pub const Operation = enum {
connect,
recv,
send,
recv_msg,
send_msg,
open_at,
close_file,
close_dir,
Expand All @@ -338,6 +366,8 @@ pub const Operation = enum {
.connect = Connect,
.recv = Recv,
.send = Send,
.recv_msg = RecvMsg,
.send_msg = SendMsg,
.open_at = OpenAt,
.close_file = CloseFile,
.close_dir = CloseDir,
Expand Down

0 comments on commit 42069b3

Please sign in to comment.