From cdab4fbd2e1fb21d8167fbd3b4869f16213d5f09 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Thu, 20 Jun 2024 01:25:47 +0900 Subject: [PATCH] wip non linux support --- build.zig | 6 + docs/pages/coro-blocking-code.mdx | 5 +- examples/aio_dynamic.zig | 7 +- flake.nix | 4 +- src/aio.zig | 13 +- src/aio/common/eventfd.zig | 25 ++++ src/aio/common/id.zig | 80 ++++++++++++ src/aio/common/posix.zig | 47 +++++++ src/aio/fallback.zig | 196 ++++++++++++++++++++++++++++ src/aio/{linux.zig => io_uring.zig} | 184 +++----------------------- src/aio/ops.zig | 4 +- src/coro.zig | 4 +- 12 files changed, 397 insertions(+), 178 deletions(-) create mode 100644 src/aio/common/eventfd.zig create mode 100644 src/aio/common/id.zig create mode 100644 src/aio/common/posix.zig create mode 100644 src/aio/fallback.zig rename src/aio/{linux.zig => io_uring.zig} (78%) diff --git a/build.zig b/build.zig index 6dd313c..19a0066 100644 --- a/build.zig +++ b/build.zig @@ -4,11 +4,16 @@ pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); + var opts = b.addOptions(); + const fallback = b.option(bool, "fallback", "use fallback event loop") orelse false; + opts.addOption(bool, "fallback", fallback); + const aio = b.addModule("aio", .{ .root_source_file = b.path("src/aio.zig"), .target = target, .optimize = optimize, }); + aio.addImport("build_options", opts.createModule()); const coro = b.addModule("coro", .{ .root_source_file = b.path("src/coro.zig"), @@ -46,6 +51,7 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule()); if (mod == .coro) tst.root_module.addImport("aio", aio); const run = b.addRunArtifact(tst); test_step.dependOn(&run.step); diff --git a/docs/pages/coro-blocking-code.mdx b/docs/pages/coro-blocking-code.mdx index de6fc69..f895023 100644 --- a/docs/pages/coro-blocking-code.mdx +++ b/docs/pages/coro-blocking-code.mdx @@ -19,12 +19,13 @@ fn task(pool: *ThreadPool) !void { try std.testing.expectEqual(69, ret); } +var scheduler = try Scheduler.init(std.testing.allocator, .{}); +defer scheduler.deinit(); + var pool: ThreadPool = .{}; defer pool.deinit(); // pool must always be destroyed before scheduler try pool.start(std.testing.allocator, 0); -var scheduler = try Scheduler.init(std.testing.allocator, .{}); -defer scheduler.deinit(); _ = try scheduler.spawn(task, .{&pool}, .{}); try scheduler.run(); ``` diff --git a/examples/aio_dynamic.zig b/examples/aio_dynamic.zig index 5973a2e..24e2739 100644 --- a/examples/aio_dynamic.zig +++ b/examples/aio_dynamic.zig @@ -30,9 +30,12 @@ pub fn main() !void { }, }); - const ret = try work.complete(.blocking); + var num_work: u16 = 2; + while (num_work > 0) { + const ret = try work.complete(.blocking); + num_work -= ret.num_completed; + } log.info("{s}", .{buf[0..len]}); log.info("{s}", .{buf2[0..len2]}); - log.info("{}", .{ret}); } diff --git a/flake.nix b/flake.nix index 0855d19..8d3419c 100644 --- a/flake.nix +++ b/flake.nix @@ -11,7 +11,9 @@ # Zig flake helper # Check the flake.nix in zig2nix project for more options: # - env = zig2nix.outputs.zig-env.${system} { zig = zig2nix.outputs.packages.${system}.zig.master.bin; }; + env = zig2nix.outputs.zig-env.${system} { + zig = zig2nix.outputs.packages.${system}.zig.master.bin; + }; system-triple = env.lib.zigTripleFromString system; in with builtins; with env.lib; with env.pkgs.lib; rec { # nix run . diff --git a/src/aio.zig b/src/aio.zig index f6f8ddb..502c635 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -3,12 +3,13 @@ //! On linux this is a very shim wrapper around `io_uring`, on other systems there might be more overhead const std = @import("std"); +const build_options = @import("build_options"); pub const InitError = error{ OutOfMemory, PermissionDenied, - ProcessQuotaExceeded, - SystemQuotaExceeded, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, SystemResources, SystemOutdated, Unexpected, @@ -21,7 +22,6 @@ pub const QueueError = error{ pub const CompletionError = error{ CompletionQueueOvercommitted, - SubmissionQueueEntryInvalid, SystemResources, Unexpected, }; @@ -130,8 +130,11 @@ pub const EventSource = struct { }; const IO = switch (@import("builtin").target.os.tag) { - .linux => @import("aio/linux.zig"), - else => @compileError("unsupported os"), + .linux => if (build_options.fallback) + @import("aio/fallback.zig") + else + @import("aio/io_uring.zig"), + else => @import("aio/fallback.zig"), }; const ops = @import("aio/ops.zig"); diff --git a/src/aio/common/eventfd.zig b/src/aio/common/eventfd.zig new file mode 100644 index 0000000..96fcc1a --- /dev/null +++ b/src/aio/common/eventfd.zig @@ -0,0 +1,25 @@ +const std = @import("std"); + +pub const EventSource = struct { + fd: std.posix.fd_t, + + pub inline fn init() !@This() { + return .{ + .fd = try std.posix.eventfd(0, std.os.linux.EFD.CLOEXEC), + }; + } + + pub inline fn deinit(self: *@This()) void { + std.posix.close(self.fd); + self.* = undefined; + } + + pub inline fn notify(self: *@This()) void { + _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; + } + + pub inline fn wait(self: *@This()) void { + var v: u64 = undefined; + _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable; + } +}; diff --git a/src/aio/common/id.zig b/src/aio/common/id.zig new file mode 100644 index 0000000..8372502 --- /dev/null +++ b/src/aio/common/id.zig @@ -0,0 +1,80 @@ +const std = @import("std"); + +pub fn Pool(T: type, SZ: type) type { + return struct { + pub const Node = union(enum) { free: ?SZ, used: T }; + nodes: []Node, + free: ?SZ = null, + num_free: SZ = 0, + num_used: SZ = 0, + + pub const Error = error{ + OutOfMemory, + }; + + pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { + return .{ .nodes = try allocator.alloc(Node, n) }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + allocator.free(self.nodes); + self.* = undefined; + } + + pub fn empty(self: *@This()) bool { + return self.num_used == self.num_free; + } + + pub fn next(self: *@This()) ?SZ { + if (self.free) |fslot| return fslot; + if (self.num_used >= self.nodes.len) return null; + return self.num_used; + } + + pub fn add(self: *@This(), item: T) Error!SZ { + if (self.free) |fslot| { + self.free = self.nodes[fslot].free; + self.nodes[fslot] = .{ .used = item }; + self.num_free -= 1; + return fslot; + } + if (self.num_used >= self.nodes.len) return error.OutOfMemory; + self.nodes[self.num_used] = .{ .used = item }; + defer self.num_used += 1; + return self.num_used; + } + + pub fn remove(self: *@This(), slot: SZ) void { + if (self.free) |fslot| { + self.nodes[slot] = .{ .free = fslot }; + } else { + self.nodes[slot] = .{ .free = null }; + } + self.free = slot; + self.num_free += 1; + } + + pub fn get(self: *@This(), slot: SZ) *T { + return &self.nodes[slot].used; + } + + pub const Iterator = struct { + items: []Node, + index: SZ = 0, + + pub fn next(self: *@This()) *T { + while (self.index < self.items.len) { + defer self.index += 1; + if (self.items[self.index] == .used) { + return &self.items[self.index].used; + } + } + return null; + } + }; + + pub fn iterator(self: *@This()) Iterator { + return .{ .items = self.nodes[0..self.num_used] }; + } + }; +} diff --git a/src/aio/common/posix.zig b/src/aio/common/posix.zig new file mode 100644 index 0000000..b8c726f --- /dev/null +++ b/src/aio/common/posix.zig @@ -0,0 +1,47 @@ +const std = @import("std"); + +pub const RENAME_NOREPLACE = 1 << 0; + +pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O { + var os_flags: std.posix.O = .{ + .ACCMODE = switch (flags.mode) { + .read_only => .RDONLY, + .write_only => .WRONLY, + .read_write => .RDWR, + }, + }; + if (@hasField(std.posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; + if (@hasField(std.posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; + if (@hasField(std.posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty; + + // Use the O locking flags if the os supports them to acquire the lock + // atomically. + const has_flock_open_flags = @hasField(std.posix.O, "EXLOCK"); + if (has_flock_open_flags) { + // Note that the NONBLOCK flag is removed after the openat() call + // is successful. + switch (flags.lock) { + .none => {}, + .shared => { + os_flags.SHLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + .exclusive => { + os_flags.EXLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + } + } + return os_flags; +} + +pub fn statusToTerm(status: u32) std.process.Child.Term { + return if (std.posix.W.IFEXITED(status)) + .{ .Exited = std.posix.W.EXITSTATUS(status) } + else if (std.posix.W.IFSIGNALED(status)) + .{ .Signal = std.posix.W.TERMSIG(status) } + else if (std.posix.W.IFSTOPPED(status)) + .{ .Stopped = std.posix.W.STOPSIG(status) } + else + .{ .Unknown = status }; +} diff --git a/src/aio/fallback.zig b/src/aio/fallback.zig new file mode 100644 index 0000000..03ae56e --- /dev/null +++ b/src/aio/fallback.zig @@ -0,0 +1,196 @@ +const std = @import("std"); +const aio = @import("../aio.zig"); +const Operation = @import("ops.zig").Operation; +const Pool = @import("common/id.zig").Pool; +const posix = @import("common/posix.zig"); + +// This code does not work yet fully + +pub const EventSource = @import("common/eventfd.zig").EventSource; + +const StateSet = struct { + finished: std.bit_set.DynamicBitSetUnmanaged, + failed: std.bit_set.DynamicBitSetUnmanaged, + mutex: std.Thread.Mutex = .{}, + + pub fn init(allocator: std.mem.Allocator, n: u16) !@This() { + var finished = try std.bit_set.DynamicBitSetUnmanaged.initEmpty(allocator, n); + errdefer finished.deinit(allocator); + var failed = try std.bit_set.DynamicBitSetUnmanaged.initEmpty(allocator, n); + errdefer failed.deinit(allocator); + return .{ .finished = finished, .failed = failed }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.finished.deinit(allocator); + self.failed.deinit(allocator); + self.* = undefined; + } +}; + +// TODO +const Linkage = struct {}; + +fd: std.posix.fd_t, +pool: *std.Thread.Pool, +linkage: Pool(Linkage, u16), +state: StateSet, + +pub fn init(allocator: std.mem.Allocator, n: u16) aio.InitError!@This() { + var pool = try allocator.create(std.Thread.Pool); + pool.init(.{ .allocator = allocator, .n_jobs = null }) catch |err| return switch (err) { + error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources, + else => |e| e, + }; + const fd = try std.posix.eventfd(0, std.os.linux.EFD.NONBLOCK | std.os.linux.EFD.CLOEXEC); + errdefer std.posix.close(fd); + var ops = try Pool(Linkage, u16).init(allocator, n); + errdefer ops.deinit(allocator); + var state = try StateSet.init(allocator, n); + errdefer state.deinit(allocator); + return .{ .fd = fd, .pool = pool, .linkage = ops, .state = state }; +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.pool.deinit(); + self.linkage.deinit(allocator); + self.state.deinit(allocator); + std.posix.close(self.fd); + self.* = undefined; +} + +fn performOperationPosix(op: anytype) !void { + var u_64: u64 align(1) = undefined; + switch (comptime Operation.tagFromPayloadType(@TypeOf(op))) { + .fsync => try std.posix.fsync(op.file.handle), + .read => op.out_read.* = try std.posix.read(op.file.handle, op.buffer), + .write => if (op.out_written) |w| { + w.* = try std.posix.write(op.file.handle, op.buffer); + }, + .accept => op.out_socket.* = try std.posix.accept(op.socket, op.addr, op.inout_addrlen, 0), + .connect => try std.posix.connect(op.socket, op.addr, op.addrlen), + .recv => op.out_read.* = try std.posix.recv(op.socket, op.buffer, 0), + .send => if (op.out_written) |w| { + w.* = try std.posix.send(op.socket, op.buffer, 0); + }, + .open_at => try std.posix.openat(op.dir.fd, op.path, posix.convertOpenFlags(op.flags), 0), + .close_file => std.posix.close(op.file.handle), + .close_dir => std.posix.close(op.dir.fd), + .timeout => std.time.sleep(@intCast(op.ns)), // waste + .link_timeout => @compileError("figure out how to implement"), + .cancel => unreachable, + .rename_at => try std.posix.renameat(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, posix.RENAME_NOREPLACE), + .unlink_at => try std.posix.unlinkat(op.dir.fd, op.path), + .mkdir_at => try std.posix.mkdirat(op.dir.fd, op.path, op.mode), + .symlink_at => try std.posix.symlinkat(op.target, op.dir.fd, op.link_path), + .child_exit => { + try std.posix.waitid(.PID, op.child, @constCast(&op._), std.posix.W.EXITED); + if (op.out_term) |term| { + term.* = posix.statusToTerm(@intCast(op._.fields.common.second.sigchld.status)); + } + }, + .socket => op.out_socket.* = try std.posix.socket(op.domain, op.flags, op.protocol), + .close_socket => std.posix.close(op.socket), + .notify_event_source => std.posix.write(op.source.native.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable, + .wait_event_source => std.posix.read(op.source.native.fd, .{ .buffer = std.mem.asBytes(&u_64) }) catch unreachable, + .close_event_source => std.posix.close(op.source.native.fd), + } +} +const Sync = struct { + start: std.Thread.ResetEvent = .{}, + wg: std.Thread.WaitGroup = .{}, + failure: bool = false, +}; + +fn executor(self: *@This(), sync: *Sync, op: anytype, n: u16) void { + sync.wg.start(); + sync.start.wait(); + if (sync.failure) { + sync.wg.finish(); + return; + } + + var did_fail = false; + defer { + self.state.mutex.lock(); + switch (op.counter) { + .dec => |c| c.* -= 1, + .inc => |c| c.* += 1, + .nop => {}, + } + self.state.finished.set(n); + if (did_fail) self.state.failed.set(n); + _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; + self.state.mutex.unlock(); + } + + if (@hasField(@TypeOf(op), "out_id")) { + if (self.state.failed.isSet(n)) { + if (op.out_error) |err| err.* = error.OperationCanceled; + did_fail = true; + return; + } + } + + performOperationPosix(op) catch |op_err| { + if (op.out_error) |err| err.* = op_err; + did_fail = true; + }; +} + +inline fn queueOperation(self: *@This(), sync: *Sync, op: anytype) aio.QueueError!u16 { + const n = self.linkage.next() orelse return error.OutOfMemory; + if (@hasField(@TypeOf(op.*), "out_id")) { + if (op.out_id) |id| id.* = @enumFromInt(n); + } + if (op.out_error) |out_error| out_error.* = error.Success; + try self.pool.spawn(executor, .{ self, sync, op.*, n }); + return self.linkage.add(.{}) catch unreachable; +} + +pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.QueueError!void { + var sync: Sync = .{}; + var ids: std.BoundedArray(u16, len) = .{}; + errdefer { + sync.failure = true; + sync.start.set(); + sync.wg.wait(); + } + errdefer for (ids.constSlice()) |id| self.linkage.remove(id); + inline for (&work.ops) |*op| { + switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { + .cancel => { + self.state.mutex.lock(); + defer self.state.mutex.unlock(); + self.state.failed.set(@intFromEnum(op.id)); + }, + else => ids.append(try self.queueOperation(&sync, op)) catch unreachable, + } + } + sync.start.set(); +} + +pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode) aio.CompletionError!aio.CompletionResult { + var fds: [1]std.posix.pollfd = .{.{ .fd = self.fd, .events = std.posix.POLL.IN, .revents = 0 }}; + _ = std.posix.poll(&fds, if (mode == .blocking) -1 else 0) catch |err| return switch (err) { + error.NetworkSubsystemFailed => unreachable, + else => |e| e, + }; + self.state.mutex.lock(); + defer self.state.mutex.unlock(); + var n: u64 align(1) = undefined; + _ = std.posix.read(self.fd, std.mem.asBytes(&n)) catch unreachable; + const num_completed: u16 = @intCast(self.state.finished.count()); + const num_errors: u16 = @intCast(self.state.failed.count()); + self.state.finished.unsetAll(); + self.state.failed.unsetAll(); + return .{ .num_completed = num_completed, .num_errors = num_errors }; +} + +pub fn immediate(comptime len: u16, work: anytype) aio.ImmediateError!u16 { + var num_errors: u16 = 0; + inline for (&work.ops) |*op| performOperationPosix(op.*) catch { + num_errors += 1; + }; + return len; +} diff --git a/src/aio/linux.zig b/src/aio/io_uring.zig similarity index 78% rename from src/aio/linux.zig rename to src/aio/io_uring.zig index e6f7987..35caa83 100644 --- a/src/aio/linux.zig +++ b/src/aio/io_uring.zig @@ -1,41 +1,16 @@ const std = @import("std"); const aio = @import("../aio.zig"); const Operation = @import("ops.zig").Operation; +const Pool = @import("common/id.zig").Pool; +const posix = @import("common/posix.zig"); -pub const EventSource = struct { - fd: std.posix.fd_t, - - pub inline fn init() !@This() { - return .{ - .fd = std.posix.eventfd(0, std.os.linux.EFD.CLOEXEC) catch |err| return switch (err) { - error.SystemResources => error.SystemResources, - error.ProcessFdQuotaExceeded => error.ProcessQuotaExceeded, - error.SystemFdQuotaExceeded => error.SystemQuotaExceeded, - error.Unexpected => error.Unexpected, - }, - }; - } - - pub inline fn deinit(self: *@This()) void { - std.posix.close(self.fd); - self.* = undefined; - } - - pub inline fn notify(self: *@This()) void { - _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; - } - - pub inline fn wait(self: *@This()) void { - var v: u64 = undefined; - _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable; - } -}; +pub const EventSource = @import("common/eventfd.zig").EventSource; io: std.os.linux.IoUring, ops: Pool(Operation.Union, u16), pub fn init(allocator: std.mem.Allocator, n: u16) aio.InitError!@This() { - const n2 = std.math.ceilPowerOfTwo(u16, n) catch return error.SystemQuotaExceeded; + const n2 = std.math.ceilPowerOfTwo(u16, n) catch unreachable; var io = try uring_init(n2); errdefer io.deinit(); const ops = try Pool(Operation.Union, u16).init(allocator, n2); @@ -91,7 +66,7 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode) aio.Completion } pub fn immediate(comptime len: u16, work: anytype) aio.ImmediateError!u16 { - var io = try uring_init(std.math.ceilPowerOfTwo(u16, len) catch return error.SystemQuotaExceeded); + var io = try uring_init(std.math.ceilPowerOfTwo(u16, len) catch unreachable); defer io.deinit(); inline for (&work.ops, 0..) |*op, idx| try uring_queue(&io, op, idx); var num = try uring_submit(&io); @@ -113,60 +88,29 @@ pub fn immediate(comptime len: u16, work: anytype) aio.ImmediateError!u16 { inline fn uring_init(n: u16) aio.InitError!std.os.linux.IoUring { return std.os.linux.IoUring.init(n, 0) catch |err| switch (err) { - error.PermissionDenied, error.SystemResources, error.SystemOutdated => |e| e, - error.ProcessFdQuotaExceeded => error.ProcessQuotaExceeded, - error.SystemFdQuotaExceeded => error.SystemQuotaExceeded, + error.PermissionDenied, + error.SystemResources, + error.SystemOutdated, + error.ProcessFdQuotaExceeded, + error.SystemFdQuotaExceeded, + => |e| e, else => error.Unexpected, }; } -fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O { - var os_flags: std.posix.O = .{ - .ACCMODE = switch (flags.mode) { - .read_only => .RDONLY, - .write_only => .WRONLY, - .read_write => .RDWR, - }, - }; - if (@hasField(std.posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; - if (@hasField(std.posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; - if (@hasField(std.posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty; - - // Use the O locking flags if the os supports them to acquire the lock - // atomically. - const has_flock_open_flags = @hasField(std.posix.O, "EXLOCK"); - if (has_flock_open_flags) { - // Note that the NONBLOCK flag is removed after the openat() call - // is successful. - switch (flags.lock) { - .none => {}, - .shared => { - os_flags.SHLOCK = true; - os_flags.NONBLOCK = flags.lock_nonblocking; - }, - .exclusive => { - os_flags.EXLOCK = true; - os_flags.NONBLOCK = flags.lock_nonblocking; - }, - } - } - return os_flags; -} - inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) aio.QueueError!void { const Trash = struct { var u_64: u64 align(1) = undefined; }; - const RENAME_NOREPLACE = 1 << 0; var sqe = switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { .fsync => try io.fsync(user_data, op.file.handle, 0), .read => try io.read(user_data, op.file.handle, .{ .buffer = op.buffer }, op.offset), .write => try io.write(user_data, op.file.handle, op.buffer, op.offset), - .accept => try io.accept(user_data, op.socket, @ptrCast(op.addr), op.inout_addrlen, 0), - .connect => try io.connect(user_data, op.socket, @ptrCast(op.addr), op.addrlen), + .accept => try io.accept(user_data, op.socket, op.addr, op.inout_addrlen, 0), + .connect => try io.connect(user_data, op.socket, op.addr, op.addrlen), .recv => try io.recv(user_data, op.socket, .{ .buffer = op.buffer }, 0), .send => try io.send(user_data, op.socket, op.buffer, 0), - .open_at => try io.openat(user_data, op.dir.fd, op.path, convertOpenFlags(op.flags), 0), + .open_at => try io.openat(user_data, op.dir.fd, op.path, posix.convertOpenFlags(op.flags), 0), .close_file => try io.close(user_data, op.file.handle), .close_dir => try io.close(user_data, op.dir.fd), .timeout => blk: { @@ -185,7 +129,7 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai break :blk try io.link_timeout(user_data, &ts, 0); }, .cancel => try io.cancel(user_data, @intFromEnum(op.id), 0), - .rename_at => try io.renameat(user_data, op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE), + .rename_at => try io.renameat(user_data, op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, posix.RENAME_NOREPLACE), .unlink_at => try io.unlinkat(user_data, op.dir.fd, op.path, 0), .mkdir_at => try io.mkdirat(user_data, op.dir.fd, op.path, op.mode), .symlink_at => try io.symlinkat(user_data, op.target, op.dir.fd, op.link_path), @@ -211,8 +155,9 @@ inline fn uring_submit(io: *std.os.linux.IoUring) aio.CompletionError!u16 { error.BufferInvalid => unreachable, error.OpcodeNotSupported => unreachable, error.RingShuttingDown => unreachable, + error.SubmissionQueueEntryInvalid => unreachable, error.SignalInterrupt => continue, - error.CompletionQueueOvercommitted, error.SubmissionQueueEntryInvalid, error.Unexpected, error.SystemResources => |e| return e, + error.CompletionQueueOvercommitted, error.Unexpected, error.SystemResources => |e| return e, }; return @intCast(n); } @@ -226,25 +171,15 @@ inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uri error.BufferInvalid => unreachable, error.OpcodeNotSupported => unreachable, error.RingShuttingDown => unreachable, + error.SubmissionQueueEntryInvalid => unreachable, error.SignalInterrupt => continue, - error.CompletionQueueOvercommitted, error.SubmissionQueueEntryInvalid, error.Unexpected, error.SystemResources => |e| return e, + error.CompletionQueueOvercommitted, error.Unexpected, error.SystemResources => |e| return e, }; return @intCast(n); } unreachable; } -fn statusToTerm(status: u32) std.process.Child.Term { - return if (std.posix.W.IFEXITED(status)) - .{ .Exited = std.posix.W.EXITSTATUS(status) } - else if (std.posix.W.IFSIGNALED(status)) - .{ .Signal = std.posix.W.TERMSIG(status) } - else if (std.posix.W.IFSTOPPED(status)) - .{ .Stopped = std.posix.W.STOPSIG(status) } - else - .{ .Unknown = status }; -} - inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) !void { switch (op.counter) { .dec => |c| c.* -= 1, @@ -533,87 +468,8 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) .cancel => {}, .rename_at, .unlink_at, .mkdir_at, .symlink_at => {}, .child_exit => if (op.out_term) |term| { - term.* = statusToTerm(@intCast(op._.fields.common.second.sigchld.status)); + term.* = posix.statusToTerm(@intCast(op._.fields.common.second.sigchld.status)); }, .socket => op.out_socket.* = cqe.res, } } - -pub fn Pool(T: type, SZ: type) type { - return struct { - pub const Node = union(enum) { free: ?SZ, used: T }; - nodes: []Node, - free: ?SZ = null, - num_free: SZ = 0, - num_used: SZ = 0, - - pub const Error = error{ - OutOfMemory, - }; - - pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { - return .{ .nodes = try allocator.alloc(Node, n) }; - } - - pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - allocator.free(self.nodes); - self.* = undefined; - } - - pub fn empty(self: *@This()) bool { - return self.num_used == self.num_free; - } - - pub fn next(self: *@This()) ?SZ { - if (self.free) |fslot| return fslot; - if (self.num_used >= self.nodes.len) return null; - return self.num_used; - } - - pub fn add(self: *@This(), item: T) Error!SZ { - if (self.free) |fslot| { - self.free = self.nodes[fslot].free; - self.nodes[fslot] = .{ .used = item }; - self.num_free -= 1; - return fslot; - } - if (self.num_used >= self.nodes.len) return error.OutOfMemory; - self.nodes[self.num_used] = .{ .used = item }; - defer self.num_used += 1; - return self.num_used; - } - - pub fn remove(self: *@This(), slot: SZ) void { - if (self.free) |fslot| { - self.nodes[slot] = .{ .free = fslot }; - } else { - self.nodes[slot] = .{ .free = null }; - } - self.free = slot; - self.num_free += 1; - } - - pub fn get(self: *@This(), slot: SZ) *T { - return &self.nodes[slot].used; - } - - pub const Iterator = struct { - items: []Node, - index: SZ = 0, - - pub fn next(self: *@This()) *T { - while (self.index < self.items.len) { - defer self.index += 1; - if (self.items[self.index] == .used) { - return &self.items[self.index].used; - } - } - return null; - } - }; - - pub fn iterator(self: *@This()) Iterator { - return .{ .items = self.nodes[0..self.num_used] }; - } - }; -} diff --git a/src/aio/ops.zig b/src/aio/ops.zig index 42a4a69..aa01402 100644 --- a/src/aio/ops.zig +++ b/src/aio/ops.zig @@ -227,8 +227,8 @@ pub const ChildExit = struct { child: std.process.Child.Id, out_term: ?*std.process.Child.Term = null, _: switch (builtin.target.os.tag) { - .linux => std.os.linux.siginfo_t, - else => @compileError("unsupported os"), + .windows => @compileError("unsupported os"), + else => std.posix.siginfo_t, } = undefined, out_id: ?*Id = null, out_error: ?*Error = null, diff --git a/src/coro.zig b/src/coro.zig index d3af3f2..f93f1dd 100644 --- a/src/coro.zig +++ b/src/coro.zig @@ -425,11 +425,11 @@ test "ThreadPool" { try std.testing.expectEqual(task1_done.*, true); } }; + var scheduler = try Scheduler.init(std.testing.allocator, .{}); + defer scheduler.deinit(); var pool: ThreadPool = .{}; defer pool.deinit(); try pool.start(std.testing.allocator, 0); - var scheduler = try Scheduler.init(std.testing.allocator, .{}); - defer scheduler.deinit(); var task1_done: bool = false; const task1 = try scheduler.spawn(Test.task1, .{&task1_done}, .{}); _ = try scheduler.spawn(Test.task2, .{ task1, &pool, &task1_done }, .{});