From 6894083786d1a998b99f2175f8c01319894f4fc4 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Tue, 18 Jun 2024 18:25:54 +0900 Subject: [PATCH] Fix compiler crashes --- src/aio.zig | 13 +- src/aio/linux.zig | 20 +-- src/aio/ops.zig | 341 ++++++++++++++++++++++++++++++---------------- src/coro.zig | 16 ++- 4 files changed, 256 insertions(+), 134 deletions(-) diff --git a/src/aio.zig b/src/aio.zig index b05090b..aace23c 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -27,8 +27,6 @@ pub const CompletionError = error{ Unexpected, }; -pub const OperationError = ops.ErrorUnion; - pub const ImmediateError = InitError || QueueError || CompletionError; pub const CompletionResult = struct { @@ -77,7 +75,12 @@ pub const Dynamic = struct { /// Completes a list of operations immediately, blocks until complete /// For error handling you must check the `out_error` field in the operation pub inline fn batch(operations: anytype) ImmediateError!CompletionResult { - return IO.immediate(operations.len, &struct { ops: @TypeOf(operations) }{ .ops = operations }); + const ti = @typeInfo(@TypeOf(operations)); + if (comptime ti == .Struct and ti.Struct.is_tuple) { + return IO.immediate(operations.len, &struct { ops: @TypeOf(operations) }{ .ops = operations }); + } else { + @compileError("expected a tuple of operations"); + } } /// Completes a list of operations immediately, blocks until complete @@ -98,7 +101,8 @@ pub inline fn single(operation: anytype) (ImmediateError || OperationError)!void const ops = @import("aio/ops.zig"); pub const Id = ops.Id; -pub const Sync = ops.Sync; +pub const OperationError = ops.Operation.Error; +pub const Fsync = ops.Fsync; pub const Read = ops.Read; pub const Write = ops.Write; pub const Accept = ops.Accept; @@ -115,6 +119,7 @@ pub const RenameAt = ops.RenameAt; pub const UnlinkAt = ops.UnlinkAt; pub const MkDirAt = ops.MkDirAt; pub const SymlinkAt = ops.SymlinkAt; +pub const WaitPid = ops.WaitPid; pub const Socket = ops.Socket; pub const CloseSocket = ops.CloseSocket; diff --git a/src/aio/linux.zig b/src/aio/linux.zig index 7e8bf4c..07dbadb 100644 --- a/src/aio/linux.zig +++ b/src/aio/linux.zig @@ -4,13 +4,13 @@ const Operation = @import("ops.zig").Operation; const ErrorUnion = @import("ops.zig").ErrorUnion; io: std.os.linux.IoUring, -ops: Pool(Operation, u16), +ops: Pool(Operation.Union, u16), pub fn init(allocator: std.mem.Allocator, n: u16) aio.InitError!@This() { const n2 = try std.math.ceilPowerOfTwo(u16, n); var io = try uring_init(n2); errdefer io.deinit(); - const ops = try Pool(Operation, u16).init(allocator, n2); + const ops = try Pool(Operation.Union, u16).init(allocator, n2); errdefer ops.deinit(allocator); return .{ .io = io, .ops = ops }; } @@ -25,7 +25,7 @@ inline fn queueOperation(self: *@This(), op: anytype) aio.QueueError!u16 { const n = self.ops.next() orelse return error.Overflow; try uring_queue(&self.io, op, n); const tag = @tagName(comptime Operation.tagFromPayloadType(@TypeOf(op.*))); - return self.ops.add(@unionInit(Operation, tag, op.*)) catch unreachable; + return self.ops.add(@unionInit(Operation.Union, tag, op.*)) catch unreachable; } pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.QueueError!void { @@ -134,8 +134,8 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai .fsync => try io.fsync(user_data, op.file.handle, 0), .read => try io.read(user_data, op.file.handle, .{ .buffer = op.buffer }, op.offset), .write => try io.write(user_data, op.file.handle, op.buffer, op.offset), - .accept => try io.accept(user_data, op.socket, @ptrCast(@alignCast(op.addr)), op.inout_addrlen, 0), - .connect => try io.connect(user_data, op.socket, @ptrCast(@alignCast(op.addr)), op.addrlen), + .accept => try io.accept(user_data, op.socket, @ptrCast(op.addr), op.inout_addrlen, 0), + .connect => try io.connect(user_data, op.socket, @ptrCast(op.addr), op.addrlen), .recv => try io.recv(user_data, op.socket, .{ .buffer = op.buffer }, 0), .send => try io.send(user_data, op.socket, op.buffer, 0), .open_at => try io.openat(user_data, op.dir.handle, op.path, convertOpenFlags(op.flags)), @@ -148,12 +148,14 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai .unlink_at => try io.unlinkat(user_data, op.dir.handle, op.path, 0), .mkdir_at => try io.mkdirat(user_data, op.dir.handle, op.path, op.mode), .symlink_at => try io.symlinkat(user_data, op.target, op.dir.handle, op.link_path), - // .waitid => try io.waitid(user_data, .PID, op.child, &op._, 0, 0), + .waitpid => try io.waitid(user_data, .PID, op.child, &op._, 0, 0), .socket => try io.socket(user_data, op.domain, op.flags, op.protocol, 0), .close_socket => try io.close(user_data, op.socket), }; if (op.link_next) sqe.flags |= std.os.linux.IOSQE_IO_LINK; - if (op.out_id) |id| id.* = @enumFromInt(user_data); + if (@hasField(@TypeOf(op.*), "out_id")) { + if (op.out_id) |id| id.* = @enumFromInt(user_data); + } } inline fn uring_submit(io: *std.os.linux.IoUring) aio.CompletionError!u16 { @@ -450,7 +452,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) .ROFS => error.ReadOnlyFileSystem, else => std.posix.unexpectedErrno(err), }, - // .waitid => unreachable, + .waitpid => unreachable, .socket => switch (err) { .SUCCESS, .INTR, .AGAIN, .FAULT => unreachable, .ACCES => error.PermissionDenied, @@ -488,7 +490,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) .timeout, .timeout_remove, .link_timeout => {}, .cancel => {}, .rename_at, .unlink_at, .mkdir_at, .symlink_at => {}, - // .waitid => op.out_term.* = statusToTerm(@intCast(op._.fields.common.second.sigchld.status)), + .waitpid => op.out_term.* = statusToTerm(@intCast(op._.fields.common.second.sigchld.status)), .socket => op.out_socket.* = cqe.res, } } diff --git a/src/aio/ops.zig b/src/aio/ops.zig index ec82a59..e727ce0 100644 --- a/src/aio/ops.zig +++ b/src/aio/ops.zig @@ -5,141 +5,239 @@ const builtin = @import("builtin"); pub const Id = enum(usize) { _ }; +// Counter that either increases or decreases a value in a given address +// Reserved when using the coroutines API +const Counter = union(enum) { + inc: *u16, + dec: *u16, + nop: void, +}; + +const SharedError = error{ + Success, + OperationCanceled, +}; + /// std.fs.File.sync -pub const Fsync = Define(struct { +pub const Fsync = struct { + pub const Error = std.fs.File.SyncError || SharedError; file: std.fs.File, -}, std.fs.File.SyncError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.File.read -pub const Read = Define(struct { +pub const Read = struct { + pub const Error = std.fs.File.ReadError || SharedError; file: std.fs.File, buffer: []u8, offset: u64 = 0, out_read: *usize, -}, std.fs.File.ReadError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.File.write -pub const Write = Define(struct { +pub const Write = struct { + pub const Error = std.fs.File.WriteError || SharedError; file: std.fs.File, buffer: []const u8, offset: u64 = 0, out_written: ?*usize = null, -}, std.fs.File.WriteError); - -// For whatever reason the posix.sockaddr crashes the compiler, so use this -const sockaddr = anyopaque; + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.posix.accept -pub const Accept = Define(struct { +pub const Accept = struct { + pub const Error = std.posix.AcceptError || SharedError; socket: std.posix.socket_t, - addr: ?*sockaddr = null, + addr: ?*std.posix.sockaddr = null, inout_addrlen: ?*std.posix.socklen_t = null, out_socket: *std.posix.socket_t, -}, std.posix.AcceptError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.posix.connect -pub const Connect = Define(struct { +pub const Connect = struct { + pub const Error = std.posix.ConnectError || SharedError; socket: std.posix.socket_t, - addr: *const sockaddr, + addr: *const std.posix.sockaddr, addrlen: std.posix.socklen_t, -}, std.posix.ConnectError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.posix.recv -pub const Recv = Define(struct { +pub const Recv = struct { + pub const Error = std.posix.RecvFromError || SharedError; socket: std.posix.socket_t, buffer: []u8, out_read: *usize, -}, std.posix.RecvFromError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.posix.send -pub const Send = Define(struct { +pub const Send = struct { + pub const Error = std.posix.SendError || SharedError; socket: std.posix.socket_t, buffer: []const u8, out_written: ?*usize = null, -}, std.posix.SendError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; // TODO: recvmsg, sendmsg /// std.fs.Dir.openFile -pub const OpenAt = Define(struct { +pub const OpenAt = struct { + pub const Error = std.fs.File.OpenError || SharedError; dir: std.fs.Dir, path: [*:0]const u8, flags: std.fs.File.OpenFlags, out_file: *std.fs.File, -}, std.fs.File.OpenError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.File.close -pub const Close = Define(struct { +pub const Close = struct { + pub const Error = SharedError; file: std.fs.File, -}, error{}); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.time.Timer.start -pub const Timeout = Define(struct { +pub const Timeout = struct { + pub const Error = SharedError; ts: struct { sec: i64 = 0, nsec: i64 = 0 }, -}, error{}); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.time.Timer.cancel (if it existed) /// XXX: Overlap with `Cancel`, is this even needed? (io_uring) -pub const TimeoutRemove = Define(struct { +pub const TimeoutRemove = struct { + pub const Error = error{ Success, InProgress, NotFound }; id: Id, -}, error{ InProgress, NotFound }); + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// Timeout linked to a operation /// This must be linked last and the operation before must have set `link_next` to `true` /// If the operation finishes before the timeout the timeout will be canceled -pub const LinkTimeout = Define(struct { +pub const LinkTimeout = struct { + pub const Error = error{InProgress} || SharedError; ts: struct { sec: i64 = 0, nsec: i64 = 0 }, out_expired: ?*bool = null, -}, error{InProgress}); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// Cancel a operation -pub const Cancel = Define(struct { +pub const Cancel = struct { + pub const Error = error{ Success, InProgress, NotFound }; id: Id, -}, error{ InProgress, NotFound }); + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.rename -pub const RenameAt = Define(struct { +pub const RenameAt = struct { + pub const Error = std.fs.Dir.RenameError || SharedError; old_dir: std.fs.Dir, old_path: [*:0]const u8, new_dir: std.fs.Dir, new_path: [*:0]const u8, -}, std.fs.Dir.RenameError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.Dir.deleteFile -pub const UnlinkAt = Define(struct { +pub const UnlinkAt = struct { + pub const Error = std.posix.UnlinkatError || SharedError; dir: std.fs.Dir, path: [*:0]const u8, -}, std.posix.UnlinkatError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.Dir.makeDir -pub const MkDirAt = Define(struct { +pub const MkDirAt = struct { + pub const Error = std.fs.Dir.MakeError || SharedError; dir: std.fs.Dir, path: [*:0]const u8, mode: u32 = std.fs.Dir.default_mode, -}, std.fs.Dir.MakeError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.fs.Dir.symlink -pub const SymlinkAt = Define(struct { +pub const SymlinkAt = struct { + pub const Error = std.posix.SymLinkError || SharedError; dir: std.fs.Dir, target: [*:0]const u8, link_path: [*:0]const u8, -}, std.posix.SymLinkError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; // TODO: linkat /// std.process.Child.wait -/// TODO: Crashes compiler, doesn't like the std.process fields wut? -pub const WaitId = Define(struct { +pub const WaitPid = struct { + pub const Error = SharedError; child: std.process.Child.Id, out_term: *std.process.Child.Term, _: switch (builtin.target.os.tag) { .linux => std.os.linux.siginfo_t, else => @compileError("unsupported os"), }, -}, error{}); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.posix.socket -pub const Socket = Define(struct { +pub const Socket = struct { + pub const Error = std.posix.SocketError || SharedError; /// std.posix.AF domain: u32, /// std.posix.SOCK @@ -147,90 +245,97 @@ pub const Socket = Define(struct { /// std.posix.IPPROTO protocol: u32, out_socket: *std.posix.socket_t, -}, std.posix.SocketError); + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; /// std.posix.close -pub const CloseSocket = Define(struct { +pub const CloseSocket = struct { + pub const Error = std.posix.SocketError || SharedError; socket: std.posix.socket_t, -}, error{}); - -pub const Operation = union(enum) { - fsync: Fsync, - read: Read, - write: Write, - accept: Accept, - connect: Connect, - recv: Recv, - send: Send, - open_at: OpenAt, - close: Close, - timeout: Timeout, - timeout_remove: TimeoutRemove, - link_timeout: LinkTimeout, - cancel: Cancel, - rename_at: RenameAt, - unlink_at: UnlinkAt, - mkdir_at: MkDirAt, - symlink_at: SymlinkAt, - // waitid: WaitId, - socket: Socket, - close_socket: CloseSocket, - - pub fn tagFromPayloadType(Op: type) std.meta.Tag(Operation) { - inline for (std.meta.fields(Operation)) |field| { - if (Op == field.type) { + out_id: ?*Id = null, + out_error: ?*Error = null, + counter: Counter = .nop, + link_next: bool = false, +}; + +pub const Operation = enum { + fsync, + read, + write, + accept, + connect, + recv, + send, + open_at, + close, + timeout, + timeout_remove, + link_timeout, + cancel, + rename_at, + unlink_at, + mkdir_at, + symlink_at, + waitpid, + socket, + close_socket, + + pub const map = std.enums.EnumMap(@This(), type).init(.{ + .fsync = Fsync, + .read = Read, + .write = Write, + .accept = Accept, + .connect = Connect, + .recv = Recv, + .send = Send, + .open_at = OpenAt, + .close = Close, + .timeout = Timeout, + .timeout_remove = TimeoutRemove, + .link_timeout = LinkTimeout, + .cancel = Cancel, + .rename_at = RenameAt, + .unlink_at = UnlinkAt, + .mkdir_at = MkDirAt, + .symlink_at = SymlinkAt, + .waitpid = WaitPid, + .socket = Socket, + .close_socket = CloseSocket, + }); + + pub fn tagFromPayloadType(comptime Op: type) @This() { + inline for (map.values, 0..) |v, idx| { + if (Op == v) { @setEvalBranchQuota(1_000_0); - return std.meta.stringToEnum(std.meta.Tag(Operation), field.name) orelse unreachable; + return @enumFromInt(idx); } } unreachable; } -}; - -const SharedError = error{ - Success, - OperationCanceled, -}; -pub const ErrorUnion = SharedError || - std.fs.File.SyncError || - std.fs.File.ReadError || - std.fs.File.WriteError || - std.posix.AcceptError || - std.posix.ConnectError || - std.posix.RecvFromError || - std.posix.SendError || - std.fs.File.OpenError || - error{ InProgress, NotFound } || - std.fs.Dir.RenameError || - std.posix.UnlinkatError || - std.fs.Dir.MakeError || - std.posix.SymLinkError || - std.process.Child.SpawnError || - std.posix.SocketError; - -fn Define(T: type, E: type) type { - // Counter that either increases or decreases a value in a given address - // Reserved when using the coroutines API - const Counter = union(enum) { - inc: *u16, - dec: *u16, - nop: void, + pub const Error = blk: { + var set = error{}; + for (Operation.map.values) |v| set = set || v.Error; + break :blk set; }; - const Super = struct { - out_id: ?*Id = null, - out_error: ?*(E || SharedError) = null, - counter: Counter = .nop, - link_next: bool = false, + pub const Union = blk: { + var fields: []const std.builtin.Type.UnionField = &.{}; + for (Operation.map.values, 0..) |v, idx| fields = fields ++ .{.{ + .name = @tagName(@as(Operation, @enumFromInt(idx))), + .type = v, + .alignment = 0, + }}; + break :blk @Type(.{ + .Union = .{ + .layout = .auto, + .tag_type = Operation, + .fields = fields, + .decls = &.{}, + }, + }); }; - - return @Type(.{ - .Struct = .{ - .layout = .auto, - .fields = std.meta.fields(T) ++ std.meta.fields(Super), - .decls = &.{}, - .is_tuple = false, - }, - }); -} +}; diff --git a/src/coro.zig b/src/coro.zig index be32b3f..ea77cad 100644 --- a/src/coro.zig +++ b/src/coro.zig @@ -48,8 +48,12 @@ pub const io = struct { var work = struct { ops: @TypeOf(operations) }{ .ops = operations }; inline for (&work.ops, &state) |*op, *s| { op.counter = .{ .dec = &task.io_counter }; - s.old_id = op.out_id; - op.out_id = &s.id; + if (@hasDecl(@TypeOf(op.*), "out_id")) { + s.old_id = op.out_id; + op.out_id = &s.id; + } else { + s.old_id = null; + } s.old_err = op.out_error; op.out_error = @ptrCast(&s.err); } @@ -89,6 +93,7 @@ pub const io = struct { /// Completes a single operation immediately, blocks the coroutine until complete /// The IO operation can be cancelled by calling `wakeup` + /// TODO: combine this and multi to avoid differences/bugs in implementation pub fn single(operation: anytype) (aio.QueueError || aio.OperationError)!void { if (Fiber.current()) |fiber| { var task: *Scheduler.TaskState = @ptrFromInt(fiber.getUserDataPtr().*); @@ -97,7 +102,11 @@ pub const io = struct { var err: @TypeOf(op.out_error.?.*) = error.Success; var id: aio.Id = undefined; op.counter = .{ .dec = &task.io_counter }; - op.out_id = &id; + var old_id: ?*aio.Id = null; + if (@hasDecl(@TypeOf(op), "out_id")) { + old_id = op.out_id; + op.out_id = &id; + } op.out_error = &err; try task.io.queue(op); task.io_counter = 1; @@ -113,6 +122,7 @@ pub const io = struct { Fiber.yield(); } + if (old_id) |p| p.* = id; if (err != error.Success) return err; } else { unreachable; // this io function is only meant to be used in coroutines!