diff --git a/src/aio/IoUring.zig b/src/aio/IoUring.zig index f2589ff..50502a0 100644 --- a/src/aio/IoUring.zig +++ b/src/aio/IoUring.zig @@ -45,6 +45,8 @@ pub inline fn isSupported(op_types: []const type) bool { .connect => std.os.linux.IORING_OP.CONNECT, // 5.5 .recv => std.os.linux.IORING_OP.RECV, // 5.6 .send => std.os.linux.IORING_OP.SEND, // 5.6 + .recv_msg => std.os.linux.IORING_OP.RECVMSG, // 5.3 + .send_msg => std.os.linux.IORING_OP.SENDMSG, // 5.3 .open_at => std.os.linux.IORING_OP.OPENAT, // 5.15 .close_file, .close_dir, .close_socket, .close_event_source => std.os.linux.IORING_OP.CLOSE, // 5.15 .timeout => std.os.linux.IORING_OP.TIMEOUT, // 5.4 @@ -236,6 +238,8 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai .connect => try io.connect(user_data, op.socket, op.addr, op.addrlen), .recv => try io.recv(user_data, op.socket, .{ .buffer = op.buffer }, 0), .send => try io.send(user_data, op.socket, op.buffer, 0), + .recv_msg => try io.recvmsg(user_data, op.socket, op.out_msg, 0), + .send_msg => try io.sendmsg(user_data, op.socket, op.msg, 0), .open_at => try io.openat(user_data, op.dir.fd, op.path, posix.convertOpenFlags(op.flags), 0), .close_file => try io.close(user_data, op.file.handle), .close_dir => try io.close(user_data, op.dir.fd), @@ -439,6 +443,39 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) .NETDOWN => error.NetworkSubsystemFailed, else => std.posix.unexpectedErrno(err), }, + .recv_msg => switch (err) { + .SUCCESS, .INTR, .INVAL, .FAULT, .AGAIN => unreachable, + .CANCELED => error.Canceled, + .BADF => unreachable, // always a race condition + .NOTCONN => error.SocketNotConnected, + .NOTSOCK => unreachable, + .NOMEM => error.SystemResources, + .CONNREFUSED => error.ConnectionRefused, + .CONNRESET => error.ConnectionResetByPeer, + .TIMEDOUT => error.ConnectionTimedOut, + else => std.posix.unexpectedErrno(err), + }, + .send_msg => switch (err) { + .SUCCESS, .INTR, .INVAL, .AGAIN => unreachable, + .CANCELED => error.Canceled, + .ACCES => error.AccessDenied, + .ALREADY => error.FastOpenAlreadyInProgress, + .BADF => unreachable, // always a race condition + .CONNRESET => error.ConnectionResetByPeer, + .DESTADDRREQ => unreachable, // The socket is not connection-mode, and no peer address is set. + .FAULT => unreachable, // An invalid user space address was specified for an argument. + .ISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified + .MSGSIZE => error.MessageTooBig, + .NOBUFS => error.SystemResources, + .NOMEM => error.SystemResources, + .NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + .OPNOTSUPP => unreachable, // Some bit in the flags argument is inappropriate for the socket type. + .PIPE => error.BrokenPipe, + .HOSTUNREACH => error.NetworkUnreachable, + .NETUNREACH => error.NetworkUnreachable, + .NETDOWN => error.NetworkSubsystemFailed, + else => std.posix.unexpectedErrno(err), + }, .open_at => switch (err) { .SUCCESS, .INTR, .INVAL, .AGAIN => unreachable, .CANCELED => error.Canceled, @@ -618,6 +655,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) .send => if (op.out_written) |w| { w.* = @intCast(cqe.res); }, + .recv_msg, .send_msg => {}, .open_at => op.out_file.handle = cqe.res, .close_file, .close_dir, .close_socket => {}, .notify_event_source, .wait_event_source, .close_event_source => {}, diff --git a/src/aio/common/posix.zig b/src/aio/common/posix.zig index 149bcd2..82b9065 100644 --- a/src/aio/common/posix.zig +++ b/src/aio/common/posix.zig @@ -157,104 +157,125 @@ fn write(op: anytype) !void { } pub inline fn perform(op: anytype, readiness: Readiness) Operation.Error!void { - switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { - .fsync => _ = try std.posix.fsync(op.file.handle), - .read => read(op) catch |err| switch (err) { - error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForReading else e, - else => |e| return e, - }, - .write => write(op) catch |err| switch (err) { - error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForWriting else e, - else => |e| return e, - }, - .accept => op.out_socket.* = try std.posix.accept(op.socket, op.addr, op.inout_addrlen, 0), - .connect => _ = try std.posix.connect(op.socket, op.addr, op.addrlen), - .recv => op.out_read.* = try std.posix.recv(op.socket, op.buffer, 0), - .send => { - const written = try std.posix.send(op.socket, op.buffer, 0); - if (op.out_written) |w| w.* = written; - }, - .open_at => if (builtin.target.os.tag == .windows) { - op.out_file.* = try op.dir.openFileZ(op.path, op.flags); - } else { - op.out_file.handle = try std.posix.openatZ(op.dir.fd, op.path, convertOpenFlags(op.flags), 0); - }, - .close_file => std.posix.close(op.file.handle), - .close_dir => std.posix.close(op.dir.fd), - .rename_at => { - if (@hasDecl(std.posix.system, "renameat2")) { - const res = std.posix.system.renameat2(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE); - const e = std.posix.errno(res); + while (true) { // for handling INTR + switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { + .fsync => _ = try std.posix.fsync(op.file.handle), + .read => read(op) catch |err| switch (err) { + error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForReading else e, + else => |e| return e, + }, + .write => write(op) catch |err| switch (err) { + error.Unexpected => |e| return if (builtin.target.os.tag == .windows) error.NotOpenForWriting else e, + else => |e| return e, + }, + .accept => op.out_socket.* = try std.posix.accept(op.socket, op.addr, op.inout_addrlen, 0), + .connect => _ = try std.posix.connect(op.socket, op.addr, op.addrlen), + .recv => op.out_read.* = try std.posix.recv(op.socket, op.buffer, 0), + .send => { + const written = try std.posix.send(op.socket, op.buffer, 0); + if (op.out_written) |w| w.* = written; + }, + .recv_msg => { + const recvmsg = if (@hasDecl(std.posix.system, "recvmsg")) + std.posix.system.recvmsg + else + std.c.recvmsg; + const e = std.posix.errno(recvmsg(op.socket, op.out_msg, 0)); if (e != .SUCCESS) return switch (e) { - .SUCCESS, .INTR, .INVAL, .AGAIN => unreachable, - .CANCELED => error.Canceled, - .ACCES => error.AccessDenied, - .PERM => error.AccessDenied, - .BUSY => error.FileBusy, - .DQUOT => error.DiskQuota, - .FAULT => unreachable, - .ISDIR => error.IsDir, - .LOOP => error.SymLinkLoop, - .MLINK => error.LinkQuotaExceeded, - .NAMETOOLONG => error.NameTooLong, - .NOENT => error.FileNotFound, - .NOTDIR => error.NotDir, + .SUCCESS, .INVAL, .BADF, .NOTSOCK => unreachable, + .INTR, .AGAIN => continue, + .CONNREFUSED => error.ConnectionRefused, + .FAULT => error.Unexpected, .NOMEM => error.SystemResources, - .NOSPC => error.NoSpaceLeft, - .EXIST => error.PathAlreadyExists, - .NOTEMPTY => error.PathAlreadyExists, - .ROFS => error.ReadOnlyFileSystem, - .XDEV => error.RenameAcrossMountPoints, + .NOTCONN => error.SocketNotConnected, else => std.posix.unexpectedErrno(e), }; + }, + .send_msg => _ = try std.posix.sendmsg(op.socket, op.msg, 0), + .open_at => if (builtin.target.os.tag == .windows) { + op.out_file.* = try op.dir.openFileZ(op.path, op.flags); } else { - // this is racy :( - if (builtin.target.os.tag == .windows) { - // access is weird on windows - if (op.new_dir.openFileZ(op.new_path, .{ .mode = .read_write })) |f| { - f.close(); - return error.PathAlreadyExists; - } else |err| switch (err) { - error.FileNotFound => {}, // ok - else => |e| return e, - } + op.out_file.handle = try std.posix.openatZ(op.dir.fd, op.path, convertOpenFlags(op.flags), 0); + }, + .close_file => std.posix.close(op.file.handle), + .close_dir => std.posix.close(op.dir.fd), + .rename_at => { + if (@hasDecl(std.posix.system, "renameat2")) { + const res = std.posix.system.renameat2(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE); + const e = std.posix.errno(res); + if (e != .SUCCESS) return switch (e) { + .SUCCESS, .INVAL => unreachable, + .INTR, .AGAIN => continue, + .CANCELED => error.Canceled, + .ACCES => error.AccessDenied, + .PERM => error.AccessDenied, + .BUSY => error.FileBusy, + .DQUOT => error.DiskQuota, + .FAULT => unreachable, + .ISDIR => error.IsDir, + .LOOP => error.SymLinkLoop, + .MLINK => error.LinkQuotaExceeded, + .NAMETOOLONG => error.NameTooLong, + .NOENT => error.FileNotFound, + .NOTDIR => error.NotDir, + .NOMEM => error.SystemResources, + .NOSPC => error.NoSpaceLeft, + .EXIST => error.PathAlreadyExists, + .NOTEMPTY => error.PathAlreadyExists, + .ROFS => error.ReadOnlyFileSystem, + .XDEV => error.RenameAcrossMountPoints, + else => std.posix.unexpectedErrno(e), + }; } else { - if (op.new_dir.accessZ(op.new_path, .{ .mode = .read_write })) { - return error.PathAlreadyExists; - } else |err| switch (err) { - error.FileNotFound => {}, // ok - else => |e| return e, + // this is racy :( + if (builtin.target.os.tag == .windows) { + // access is weird on windows + if (op.new_dir.openFileZ(op.new_path, .{ .mode = .read_write })) |f| { + f.close(); + return error.PathAlreadyExists; + } else |err| switch (err) { + error.FileNotFound => {}, // ok + else => |e| return e, + } + } else { + if (op.new_dir.accessZ(op.new_path, .{ .mode = .read_write })) { + return error.PathAlreadyExists; + } else |err| switch (err) { + error.FileNotFound => {}, // ok + else => |e| return e, + } } + try std.posix.renameatZ(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path); } - try std.posix.renameatZ(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path); - } - }, - .unlink_at => try std.posix.unlinkatZ(op.dir.fd, op.path, 0), - .mkdir_at => try std.posix.mkdiratZ(op.dir.fd, op.path, op.mode), - .symlink_at => if (builtin.target.os.tag == .windows) { - try op.dir.symLinkZ(op.target, op.link_path, .{}); - } else { - try std.posix.symlinkatZ(op.target, op.dir.fd, op.link_path); - }, - .child_exit => { - if (builtin.target.os.tag == .windows) { - @panic("fixme"); - } else if (comptime @hasDecl(std.posix.system, "waitid")) { - var siginfo: std.posix.siginfo_t = undefined; - _ = std.posix.system.waitid(.PIDFD, readiness.fd, &siginfo, std.posix.W.EXITED | std.posix.W.NOHANG); - if (op.out_term) |term| term.* = statusToTerm(@intCast(siginfo.fields.common.second.sigchld.status)); + }, + .unlink_at => try std.posix.unlinkatZ(op.dir.fd, op.path, 0), + .mkdir_at => try std.posix.mkdiratZ(op.dir.fd, op.path, op.mode), + .symlink_at => if (builtin.target.os.tag == .windows) { + try op.dir.symLinkZ(op.target, op.link_path, .{}); } else { - const res = std.posix.waitpid(op.child, std.posix.W.NOHANG); - if (op.out_term) |term| term.* = statusToTerm(res.status); - } - }, - .socket => op.out_socket.* = try std.posix.socket(op.domain, op.flags, op.protocol), - .close_socket => std.posix.close(op.socket), - .notify_event_source => op.source.notify(), - .wait_event_source => op.source.wait(), - .close_event_source => op.source.deinit(), - // this function is meant for execution on a thread, it makes no sense to execute these on a thread - .nop, .timeout, .link_timeout, .cancel => unreachable, + try std.posix.symlinkatZ(op.target, op.dir.fd, op.link_path); + }, + .child_exit => { + if (builtin.target.os.tag == .windows) { + @panic("fixme"); + } else if (comptime @hasDecl(std.posix.system, "waitid")) { + var siginfo: std.posix.siginfo_t = undefined; + _ = std.posix.system.waitid(.PIDFD, readiness.fd, &siginfo, std.posix.W.EXITED | std.posix.W.NOHANG); + if (op.out_term) |term| term.* = statusToTerm(@intCast(siginfo.fields.common.second.sigchld.status)); + } else { + const res = std.posix.waitpid(op.child, std.posix.W.NOHANG); + if (op.out_term) |term| term.* = statusToTerm(res.status); + } + }, + .socket => op.out_socket.* = try std.posix.socket(op.domain, op.flags, op.protocol), + .close_socket => std.posix.close(op.socket), + .notify_event_source => op.source.notify(), + .wait_event_source => op.source.wait(), + .close_event_source => op.source.deinit(), + // this function is meant for execution on a thread, it makes no sense to execute these on a thread + .nop, .timeout, .link_timeout, .cancel => unreachable, + } + break; } } @@ -287,9 +308,9 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness { } break :blk .{ .fd = op.file.handle, .mode = .in }; }, - .accept, .recv => .{ .fd = op.socket, .mode = .in }, + .accept, .recv, .recv_msg => .{ .fd = op.socket, .mode = .in }, .socket, .connect => .{}, - .send => .{ .fd = op.socket, .mode = .out }, + .send, .send_msg => .{ .fd = op.socket, .mode = .out }, .open_at, .close_file, .close_dir, .close_socket => .{}, .timeout, .link_timeout => blk: { if (builtin.target.os.tag == .windows) { @@ -397,7 +418,7 @@ pub inline fn armReadiness(op: anytype, readiness: Readiness) ArmReadinessError! }, .nop => {}, .fsync, .read, .write => {}, - .socket, .accept, .connect, .recv, .send => {}, + .socket, .accept, .connect, .recv, .send, .recv_msg, .send_msg => {}, .open_at, .close_file, .close_dir, .close_socket => {}, .cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => {}, .notify_event_source, .wait_event_source, .close_event_source => {}, @@ -410,7 +431,7 @@ pub inline fn closeReadiness(op: anytype, readiness: Readiness) void { .timeout, .link_timeout, .child_exit => true, .nop => false, .fsync, .read, .write => false, - .socket, .accept, .connect, .recv, .send => false, + .socket, .accept, .connect, .recv, .send, .recv_msg, .send_msg => false, .open_at, .close_file, .close_dir, .close_socket => false, .cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => false, .notify_event_source, .wait_event_source, .close_event_source => false, diff --git a/src/aio/ops.zig b/src/aio/ops.zig index f7833bb..e838a10 100644 --- a/src/aio/ops.zig +++ b/src/aio/ops.zig @@ -113,7 +113,33 @@ pub const Send = struct { userdata: usize = 0, }; -// TODO: recvmsg, sendmsg +/// recvmsg(2) +pub const RecvMsg = struct { + pub const Error = error{ + ConnectionRefused, + ConnectionTimedOut, + ConnectionResetByPeer, + SystemResources, + SocketNotConnected, + } || SharedError; + socket: std.posix.socket_t, + out_msg: *std.posix.msghdr, + out_id: ?*Id = null, + out_error: ?*Error = null, + link: Link = .unlinked, + userdata: usize = 0, +}; + +/// std.posix.sendmsg +pub const SendMsg = struct { + pub const Error = std.posix.SendMsgError || SharedError; + socket: std.posix.socket_t, + msg: *const std.posix.msghdr_const, + out_id: ?*Id = null, + out_error: ?*Error = null, + link: Link = .unlinked, + userdata: usize = 0, +}; /// std.fs.Dir.openFile pub const OpenAt = struct { @@ -312,6 +338,8 @@ pub const Operation = enum { connect, recv, send, + recv_msg, + send_msg, open_at, close_file, close_dir, @@ -338,6 +366,8 @@ pub const Operation = enum { .connect = Connect, .recv = Recv, .send = Send, + .recv_msg = RecvMsg, + .send_msg = SendMsg, .open_at = OpenAt, .close_file = CloseFile, .close_dir = CloseDir,