From ede8b2b6f4d087b49d392b07c7dd101a608eea98 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Thu, 20 Jun 2024 01:25:47 +0900 Subject: [PATCH] wip non linux support --- build.zig | 10 +- docs/pages/aio-immediate.mdx | 9 +- docs/pages/aio-operations.mdx | 11 +- docs/pages/coro-blocking-code.mdx | 5 +- docs/pages/index.mdx | 2 +- examples/aio_dynamic.zig | 7 +- .../{aio_static.zig => aio_immediate.zig} | 2 +- examples/coro.zig | 14 +- flake.nix | 4 +- src/aio.zig | 117 ++++-- src/aio/common/eventfd.zig | 23 ++ src/aio/common/posix.zig | 172 ++++++++ src/aio/common/types.zig | 121 ++++++ src/aio/fallback.zig | 382 ++++++++++++++++++ src/aio/{linux.zig => io_uring.zig} | 245 +++-------- src/aio/ops.zig | 73 ++-- src/coro.zig | 40 +- 17 files changed, 952 insertions(+), 285 deletions(-) rename examples/{aio_static.zig => aio_immediate.zig} (94%) create mode 100644 src/aio/common/eventfd.zig create mode 100644 src/aio/common/posix.zig create mode 100644 src/aio/common/types.zig create mode 100644 src/aio/fallback.zig rename src/aio/{linux.zig => io_uring.zig} (74%) diff --git a/build.zig b/build.zig index 6dd313c..5017657 100644 --- a/build.zig +++ b/build.zig @@ -4,11 +4,16 @@ pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); + var opts = b.addOptions(); + const fallback = b.option(bool, "fallback", "use fallback event loop") orelse false; + opts.addOption(bool, "fallback", fallback); + const aio = b.addModule("aio", .{ .root_source_file = b.path("src/aio.zig"), .target = target, .optimize = optimize, }); + aio.addImport("build_options", opts.createModule()); const coro = b.addModule("coro", .{ .root_source_file = b.path("src/coro.zig"), @@ -20,7 +25,7 @@ pub fn build(b: *std.Build) void { const run_all = b.step("run", "Run all examples"); inline for (.{ .aio_dynamic, - .aio_static, + .aio_immediate, .coro, }) |example| { const exe = b.addExecutable(.{ @@ -39,13 +44,16 @@ pub fn build(b: *std.Build) void { run_all.dependOn(&cmd.step); } + const test_filter = b.option([]const u8, "test-filter", "Skip tests that do not match any filter") orelse ""; const test_step = b.step("test", "Run unit tests"); inline for (.{ .aio, .coro }) |mod| { const tst = b.addTest(.{ .root_source_file = b.path("src/" ++ @tagName(mod) ++ ".zig"), .target = target, .optimize = optimize, + .filters = &.{test_filter}, }); + if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule()); if (mod == .coro) tst.root_module.addImport("aio", aio); const run = b.addRunArtifact(tst); test_step.dependOn(&run.step); diff --git a/docs/pages/aio-immediate.mdx b/docs/pages/aio-immediate.mdx index bdcc056..af8262a 100644 --- a/docs/pages/aio-immediate.mdx +++ b/docs/pages/aio-immediate.mdx @@ -29,14 +29,17 @@ var my_buffer: [1024]u8 = undefined; var my_len: usize = undefined; try aio.multi(.{ - aio.Write{.file = f, .buffer = "contents", .link_next = true}, + aio.Write{.file = f, .buffer = "contents", .link = .soft}, aio.Read{.file = f, .buffer = &my_buffer, .out_read = &my_len}, }); ``` -The `.link_next` field of operation can be used to link the operation to the next operation. +The `.link` field of operation can be used to link the operation to the next operation. When linking operations, the next operation won't start until this operation is complete. +`soft` link will propagate failure to next operations in the link chain. +`hard` link will not propagate failure, and the next operation starts normally. + #### Using complete Complete is similar to multi, but it will not return `error.SomeOperationFailed` in case any of the operations fail. @@ -51,7 +54,7 @@ var write_error: std.posix.WriteError = undefined; var read_error: std.posix.ReadError = undefined; const res = try aio.complete(.{ - aio.Write{.file = f, .buffer = "contents", .out_error = &write_error, .link_next = true}, + aio.Write{.file = f, .buffer = "contents", .out_error = &write_error, .link = .soft}, aio.Read{.file = f, .buffer = &my_buffer, .out_error = &read_error, .out_read = &my_len}, }); diff --git a/docs/pages/aio-operations.mdx b/docs/pages/aio-operations.mdx index c6fba6f..7b0bd44 100644 --- a/docs/pages/aio-operations.mdx +++ b/docs/pages/aio-operations.mdx @@ -24,12 +24,18 @@ link_next: bool = false, If `out_id` is set, the id of the operation will be stored into that address immediately after sucessful queue. The `id` can then be used in future operations to refer to this operation. + + If `out_error` is set, the error of the operation will be stored into that address, in case the operation failed. If there was no failure a `error.Success` will be stored in that address. `counter` can be used to set either decreasing or increasing counter. When operation completes it will either decrease or increase the `u16` stored at the address. -`link_next` can be used to link the next operation into this operation. + + +`link` can be used to link the next operation into this operation. When operations are linked, the next operation won't start until this operation has completed first. +`soft` link will propagate failure to next operations in the link chain. +`hard` link will not propagate failure, and the next operation starts normally. ### Fsync @@ -144,13 +150,12 @@ ns: i128, ### LinkTimeout Timeout linked to a operation. -The operation before must have set `link_next` to `true`. +The operation before must have set `link` to either `soft` or `hard`. If the operation finishes before the timeout, then the timeout will be canceled. If the timeout finishes before the operation, then the operation will be canceled. ```zig ns: i128, -out_expired: ?*bool = null, ``` ### Cancel diff --git a/docs/pages/coro-blocking-code.mdx b/docs/pages/coro-blocking-code.mdx index de6fc69..f895023 100644 --- a/docs/pages/coro-blocking-code.mdx +++ b/docs/pages/coro-blocking-code.mdx @@ -19,12 +19,13 @@ fn task(pool: *ThreadPool) !void { try std.testing.expectEqual(69, ret); } +var scheduler = try Scheduler.init(std.testing.allocator, .{}); +defer scheduler.deinit(); + var pool: ThreadPool = .{}; defer pool.deinit(); // pool must always be destroyed before scheduler try pool.start(std.testing.allocator, 0); -var scheduler = try Scheduler.init(std.testing.allocator, .{}); -defer scheduler.deinit(); _ = try scheduler.spawn(task, .{&pool}, .{}); try scheduler.run(); ``` diff --git a/docs/pages/index.mdx b/docs/pages/index.mdx index 245989a..caf033a 100644 --- a/docs/pages/index.mdx +++ b/docs/pages/index.mdx @@ -7,7 +7,7 @@ title: 'zig-aio: io_uring like asynchronous API and coroutine powered IO tasks f zig-aio provides io_uring like asynchronous API and coroutine powered IO tasks for zig ```zig -// [!include ~/../examples/aio_static.zig] +// [!include ~/../examples/aio_immediate.zig] ``` ## Features diff --git a/examples/aio_dynamic.zig b/examples/aio_dynamic.zig index 5973a2e..24e2739 100644 --- a/examples/aio_dynamic.zig +++ b/examples/aio_dynamic.zig @@ -30,9 +30,12 @@ pub fn main() !void { }, }); - const ret = try work.complete(.blocking); + var num_work: u16 = 2; + while (num_work > 0) { + const ret = try work.complete(.blocking); + num_work -= ret.num_completed; + } log.info("{s}", .{buf[0..len]}); log.info("{s}", .{buf2[0..len2]}); - log.info("{}", .{ret}); } diff --git a/examples/aio_static.zig b/examples/aio_immediate.zig similarity index 94% rename from examples/aio_static.zig rename to examples/aio_immediate.zig index c2ffc2e..07dc754 100644 --- a/examples/aio_static.zig +++ b/examples/aio_immediate.zig @@ -1,6 +1,6 @@ const std = @import("std"); const aio = @import("aio"); -const log = std.log.scoped(.aio_static); +const log = std.log.scoped(.aio_immediate); pub fn main() !void { var f = try std.fs.cwd().openFile("flake.nix", .{}); diff --git a/examples/coro.zig b/examples/coro.zig index a28a615..962b80d 100644 --- a/examples/coro.zig +++ b/examples/coro.zig @@ -4,7 +4,7 @@ const coro = @import("coro"); const log = std.log.scoped(.coro_aio); pub const aio_coro_options: coro.Options = .{ - .debug = false, // set to true to enable debug logs + .debug = true, // set to true to enable debug logs }; const Yield = enum { @@ -26,6 +26,7 @@ fn server(client_task: coro.Task) !void { try std.posix.bind(socket, &address.any, address.getOsSockLen()); try std.posix.listen(socket, 128); + // TODO: needs .wait option coro.wakeupFromState(client_task, Yield.server_ready); var client_sock: std.posix.socket_t = undefined; @@ -34,16 +35,16 @@ fn server(client_task: coro.Task) !void { var buf: [1024]u8 = undefined; var len: usize = 0; try coro.io.multi(.{ - aio.Send{ .socket = client_sock, .buffer = "hey ", .link_next = true }, - aio.Send{ .socket = client_sock, .buffer = "I'm doing multiple IO ops at once ", .link_next = true }, - aio.Send{ .socket = client_sock, .buffer = "how cool is that? ", .link_next = true }, + aio.Send{ .socket = client_sock, .buffer = "hey ", .link = .soft }, + aio.Send{ .socket = client_sock, .buffer = "I'm doing multiple IO ops at once ", .link = .soft }, + aio.Send{ .socket = client_sock, .buffer = "how cool is that? ", .link = .soft }, aio.Recv{ .socket = client_sock, .buffer = &buf, .out_read = &len }, }); log.warn("got reply from client: {s}", .{buf[0..len]}); try coro.io.multi(.{ - aio.Send{ .socket = client_sock, .buffer = "ok bye", .link_next = true }, - aio.CloseSocket{ .socket = client_sock, .link_next = true }, + aio.Send{ .socket = client_sock, .buffer = "ok bye", .link = .soft }, + aio.CloseSocket{ .socket = client_sock, .link = .soft }, aio.CloseSocket{ .socket = socket }, }); } @@ -64,7 +65,6 @@ fn client() !void { .socket = socket, .addr = &address.any, .addrlen = address.getOsSockLen(), - .link_next = true, }); while (true) { diff --git a/flake.nix b/flake.nix index 0855d19..8d3419c 100644 --- a/flake.nix +++ b/flake.nix @@ -11,7 +11,9 @@ # Zig flake helper # Check the flake.nix in zig2nix project for more options: # - env = zig2nix.outputs.zig-env.${system} { zig = zig2nix.outputs.packages.${system}.zig.master.bin; }; + env = zig2nix.outputs.zig-env.${system} { + zig = zig2nix.outputs.packages.${system}.zig.master.bin; + }; system-triple = env.lib.zigTripleFromString system; in with builtins; with env.lib; with env.pkgs.lib; rec { # nix run . diff --git a/src/aio.zig b/src/aio.zig index f6f8ddb..1010ec1 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -3,31 +3,31 @@ //! On linux this is a very shim wrapper around `io_uring`, on other systems there might be more overhead const std = @import("std"); +const build_options = @import("build_options"); -pub const InitError = error{ - OutOfMemory, - PermissionDenied, - ProcessQuotaExceeded, - SystemQuotaExceeded, - SystemResources, - SystemOutdated, - Unexpected, -}; +const root = @import("root"); +pub const options: Options = if (@hasDecl(root, "aio_options")) root.aio_options else .{}; -pub const QueueError = error{ - OutOfMemory, - SubmissionQueueFull, +pub const Options = struct { + /// Enable debug logs and tracing + debug: bool = true, + /// Num threads for a thread pool, if a backend requires one + num_threads: ?u32 = null, // by default use the cpu core count }; -pub const CompletionError = error{ +pub const Error = error{ + OutOfMemory, CompletionQueueOvercommitted, - SubmissionQueueEntryInvalid, + SubmissionQueueFull, + NoDevice, + PermissionDenied, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, SystemResources, + SystemOutdated, Unexpected, }; -pub const ImmediateError = InitError || QueueError || CompletionError; - pub const CompletionResult = struct { num_completed: u16 = 0, num_errors: u16 = 0, @@ -37,7 +37,7 @@ pub const CompletionResult = struct { pub const Dynamic = struct { io: IO, - pub inline fn init(allocator: std.mem.Allocator, n: u16) InitError!@This() { + pub inline fn init(allocator: std.mem.Allocator, n: u16) Error!@This() { return .{ .io = try IO.init(allocator, n) }; } @@ -48,7 +48,7 @@ pub const Dynamic = struct { /// Queue operations for future completion /// The call is atomic, if any of the operations fail to queue, then the given operations are reverted - pub inline fn queue(self: *@This(), operations: anytype) QueueError!void { + pub inline fn queue(self: *@This(), operations: anytype) Error!void { const ti = @typeInfo(@TypeOf(operations)); if (comptime ti == .Struct and ti.Struct.is_tuple) { var work = struct { ops: @TypeOf(operations) }{ .ops = operations }; @@ -70,7 +70,7 @@ pub const Dynamic = struct { /// Complete operations /// Returns the number of completed operations, `0` if no operations were completed - pub inline fn complete(self: *@This(), mode: CompletionMode) CompletionError!CompletionResult { + pub inline fn complete(self: *@This(), mode: CompletionMode) Error!CompletionResult { return self.io.complete(mode); } }; @@ -78,7 +78,7 @@ pub const Dynamic = struct { /// Completes a list of operations immediately, blocks until complete /// For error handling you must check the `out_error` field in the operation /// Returns the number of errors occured, 0 if there were no errors -pub inline fn complete(operations: anytype) ImmediateError!u16 { +pub inline fn complete(operations: anytype) Error!u16 { const ti = @typeInfo(@TypeOf(operations)); if (comptime ti == .Struct and ti.Struct.is_tuple) { if (comptime operations.len == 0) @compileError("no work to be done"); @@ -95,12 +95,12 @@ pub inline fn complete(operations: anytype) ImmediateError!u16 { /// Completes a list of operations immediately, blocks until complete /// Returns `error.SomeOperationFailed` if any operation failed -pub inline fn multi(operations: anytype) (ImmediateError || error{SomeOperationFailed})!void { +pub inline fn multi(operations: anytype) (Error || error{SomeOperationFailed})!void { if (try complete(operations) > 0) return error.SomeOperationFailed; } /// Completes a single operation immediately, blocks until complete -pub inline fn single(operation: anytype) (ImmediateError || @TypeOf(operation).Error)!void { +pub inline fn single(operation: anytype) (Error || @TypeOf(operation).Error)!void { var op: @TypeOf(operation) = operation; var err: @TypeOf(operation).Error = error.Success; op.out_error = &err; @@ -111,7 +111,14 @@ pub inline fn single(operation: anytype) (ImmediateError || @TypeOf(operation).E pub const EventSource = struct { native: IO.EventSource, - pub inline fn init() InitError!@This() { + pub const Error = error{ + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + SystemResources, + Unexpected, + }; + + pub inline fn init() @This().Error!@This() { return .{ .native = try IO.EventSource.init() }; } @@ -130,8 +137,11 @@ pub const EventSource = struct { }; const IO = switch (@import("builtin").target.os.tag) { - .linux => @import("aio/linux.zig"), - else => @compileError("unsupported os"), + .linux => if (build_options.fallback) + @import("aio/fallback.zig") + else + @import("aio/io_uring.zig"), + else => @import("aio/fallback.zig"), }; const ops = @import("aio/ops.zig"); @@ -201,7 +211,7 @@ test "Read" { defer f.close(); try f.writeAll("foobar"); try single(Read{ .file = f, .buffer = &buf, .out_read = &len }); - try std.testing.expectEqual(len, "foobar".len); + try std.testing.expectEqual("foobar".len, len); try std.testing.expectEqualSlices(u8, "foobar", buf[0..len]); } { @@ -223,7 +233,7 @@ test "Write" { var f = try tmp.dir.createFile("test", .{ .read = true }); defer f.close(); try single(Write{ .file = f, .buffer = "foobar", .out_written = &len }); - try std.testing.expectEqual(len, "foobar".len); + try std.testing.expectEqual("foobar".len, len); const read = try f.readAll(&buf); try std.testing.expectEqualSlices(u8, "foobar", buf[0..read]); } @@ -272,15 +282,48 @@ test "Timeout" { } test "LinkTimeout" { - var err: Timeout.Error = undefined; - var expired: bool = undefined; - const num_errors = try complete(.{ - Timeout{ .ns = 2 * std.time.ns_per_s, .out_error = &err, .link_next = true }, - LinkTimeout{ .ns = 1 * std.time.ns_per_s, .out_expired = &expired }, - }); - try std.testing.expectEqual(1, num_errors); - try std.testing.expectEqual(error.OperationCanceled, err); - try std.testing.expectEqual(true, expired); + { + var err: Timeout.Error = undefined; + var err2: LinkTimeout.Error = undefined; + const num_errors = try complete(.{ + Timeout{ .ns = 2 * std.time.ns_per_s, .out_error = &err, .link = .soft }, + LinkTimeout{ .ns = 1 * std.time.ns_per_s, .out_error = &err2 }, + }); + try std.testing.expectEqual(2, num_errors); + try std.testing.expectEqual(error.OperationCanceled, err); + try std.testing.expectEqual(error.Expired, err2); + } + { + const num_errors = try complete(.{ + Timeout{ .ns = 2 * std.time.ns_per_s, .link = .soft }, + LinkTimeout{ .ns = 1 * std.time.ns_per_s, .link = .soft }, + Timeout{ .ns = 3 * std.time.ns_per_s, .link = .soft }, + }); + try std.testing.expectEqual(3, num_errors); + } + { + const num_errors = try complete(.{ + Timeout{ .ns = 1 * std.time.ns_per_s, .link = .soft }, + LinkTimeout{ .ns = 2 * std.time.ns_per_s }, + }); + try std.testing.expectEqual(0, num_errors); + } + { + const num_errors = try complete(.{ + Timeout{ .ns = 1 * std.time.ns_per_s, .link = .soft }, + LinkTimeout{ .ns = 2 * std.time.ns_per_s, .link = .soft }, + Timeout{ .ns = 3 * std.time.ns_per_s, .link = .soft }, + }); + try std.testing.expectEqual(1, num_errors); + } + { + const num_errors = try complete(.{ + Timeout{ .ns = 1 * std.time.ns_per_s, .link = .hard }, + LinkTimeout{ .ns = 2 * std.time.ns_per_s, .link = .soft }, + Timeout{ .ns = 3 * std.time.ns_per_s, .link = .soft }, + }); + try std.testing.expectEqual(0, num_errors); + } } test "Cancel" { @@ -382,7 +425,7 @@ test "EventSource" { const source = try EventSource.init(); try multi(.{ NotifyEventSource{ .source = source }, - WaitEventSource{ .source = source, .link_next = true }, + WaitEventSource{ .source = source, .link = .hard }, CloseEventSource{ .source = source }, }); } diff --git a/src/aio/common/eventfd.zig b/src/aio/common/eventfd.zig new file mode 100644 index 0000000..005916f --- /dev/null +++ b/src/aio/common/eventfd.zig @@ -0,0 +1,23 @@ +const std = @import("std"); + +fd: std.posix.fd_t, + +pub inline fn init() !@This() { + return .{ + .fd = try std.posix.eventfd(0, std.os.linux.EFD.CLOEXEC), + }; +} + +pub inline fn deinit(self: *@This()) void { + std.posix.close(self.fd); + self.* = undefined; +} + +pub inline fn notify(self: *@This()) void { + _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; +} + +pub inline fn wait(self: *@This()) void { + var v: u64 = undefined; + _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable; +} diff --git a/src/aio/common/posix.zig b/src/aio/common/posix.zig new file mode 100644 index 0000000..87aab18 --- /dev/null +++ b/src/aio/common/posix.zig @@ -0,0 +1,172 @@ +const std = @import("std"); +const Operation = @import("../ops.zig").Operation; + +pub const RENAME_NOREPLACE = 1 << 0; + +pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O { + var os_flags: std.posix.O = .{ + .ACCMODE = switch (flags.mode) { + .read_only => .RDONLY, + .write_only => .WRONLY, + .read_write => .RDWR, + }, + }; + if (@hasField(std.posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; + if (@hasField(std.posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; + if (@hasField(std.posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty; + + // Use the O locking flags if the os supports them to acquire the lock + // atomically. + const has_flock_open_flags = @hasField(std.posix.O, "EXLOCK"); + if (has_flock_open_flags) { + // Note that the NONBLOCK flag is removed after the openat() call + // is successful. + switch (flags.lock) { + .none => {}, + .shared => { + os_flags.SHLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + .exclusive => { + os_flags.EXLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + } + } + return os_flags; +} + +pub inline fn statusToTerm(status: u32) std.process.Child.Term { + return if (std.posix.W.IFEXITED(status)) + .{ .Exited = std.posix.W.EXITSTATUS(status) } + else if (std.posix.W.IFSIGNALED(status)) + .{ .Signal = std.posix.W.TERMSIG(status) } + else if (std.posix.W.IFSTOPPED(status)) + .{ .Stopped = std.posix.W.STOPSIG(status) } + else + .{ .Unknown = status }; +} + +pub inline fn perform(op: anytype) !void { + var u_64: u64 align(1) = undefined; + switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { + .fsync => _ = try std.posix.fsync(op.file.handle), + .read => op.out_read.* = try std.posix.pread(op.file.handle, op.buffer, op.offset), + .write => { + const written = try std.posix.pwrite(op.file.handle, op.buffer, op.offset); + if (op.out_written) |w| w.* = written; + }, + .accept => op.out_socket.* = try std.posix.accept(op.socket, op.addr, op.inout_addrlen, 0), + .connect => _ = try std.posix.connect(op.socket, op.addr, op.addrlen), + .recv => op.out_read.* = try std.posix.recv(op.socket, op.buffer, 0), + .send => { + const written = try std.posix.send(op.socket, op.buffer, 0); + if (op.out_written) |w| w.* = written; + }, + .open_at => op.out_file.handle = try std.posix.openatZ(op.dir.fd, op.path, convertOpenFlags(op.flags), 0), + .close_file => std.posix.close(op.file.handle), + .close_dir => std.posix.close(op.dir.fd), + .rename_at => { + const res = std.os.linux.renameat2(op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE); + const e = std.posix.errno(res); + if (e != .SUCCESS) return switch (e) { + .SUCCESS, .INTR, .INVAL, .AGAIN => unreachable, + .CANCELED => error.OperationCanceled, + .ACCES => error.AccessDenied, + .PERM => error.AccessDenied, + .BUSY => error.FileBusy, + .DQUOT => error.DiskQuota, + .FAULT => unreachable, + .ISDIR => error.IsDir, + .LOOP => error.SymLinkLoop, + .MLINK => error.LinkQuotaExceeded, + .NAMETOOLONG => error.NameTooLong, + .NOENT => error.FileNotFound, + .NOTDIR => error.NotDir, + .NOMEM => error.SystemResources, + .NOSPC => error.NoSpaceLeft, + .EXIST => error.PathAlreadyExists, + .NOTEMPTY => error.PathAlreadyExists, + .ROFS => error.ReadOnlyFileSystem, + .XDEV => error.RenameAcrossMountPoints, + else => std.posix.unexpectedErrno(e), + }; + }, + .unlink_at => _ = try std.posix.unlinkatZ(op.dir.fd, op.path, 0), + .mkdir_at => _ = try std.posix.mkdiratZ(op.dir.fd, op.path, op.mode), + .symlink_at => _ = try std.posix.symlinkatZ(op.target, op.dir.fd, op.link_path), + .child_exit => { + _ = std.posix.system.waitid(.PID, op.child, @constCast(&op._), std.posix.W.EXITED); + if (op.out_term) |term| { + term.* = statusToTerm(@intCast(op._.fields.common.second.sigchld.status)); + } + }, + .socket => op.out_socket.* = try std.posix.socket(op.domain, op.flags, op.protocol), + .close_socket => std.posix.close(op.socket), + .notify_event_source => _ = std.posix.write(op.source.native.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable, + .wait_event_source => _ = std.posix.read(op.source.native.fd, std.mem.asBytes(&u_64)) catch unreachable, + .close_event_source => std.posix.close(op.source.native.fd), + // this function is meant for execution on a thread, it makes no sense to execute these on a thread + .timeout, .link_timeout, .cancel => unreachable, + } +} + +pub inline fn readinessFd(op: anytype) !std.posix.fd_t { + return switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { + .fsync, .write => 0, + .read => op.file.handle, + .accept, .recv => op.socket, + .socket, .connect, .send => 0, + .open_at, .close_file, .close_dir, .close_socket => 0, + .timeout, .link_timeout => blk: { + const ts: std.os.linux.itimerspec = .{ + .it_value = .{ + .tv_sec = @intCast(op.ns / std.time.ns_per_s), + .tv_nsec = @intCast(op.ns % std.time.ns_per_s), + }, + .it_interval = .{ + .tv_sec = 0, + .tv_nsec = 0, + }, + }; + const fd = std.posix.timerfd_create(std.posix.CLOCK.MONOTONIC, .{ .CLOEXEC = true, .NONBLOCK = true }) catch |err| return switch (err) { + error.AccessDenied => unreachable, + else => |e| e, + }; + errdefer std.posix.close(fd); + _ = std.posix.timerfd_settime(fd, .{}, &ts, null) catch |err| return switch (err) { + error.Canceled, error.InvalidHandle => unreachable, + error.Unexpected => |e| e, + }; + break :blk fd; + }, + .cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => 0, + .child_exit => blk: { + const res = std.posix.system.pidfd_open(op.child, @as(usize, 1 << @bitOffsetOf(std.posix.O, "NONBLOCK"))); + const e = std.posix.errno(res); + if (e != .SUCCESS) return switch (e) { + .INVAL, .SRCH => unreachable, + .MFILE => error.ProcessFdQuotaExceeded, + .NFILE => error.SystemFdQuotaExceeded, + .NODEV => error.NoDevice, + .NOMEM => error.SystemResources, + else => std.posix.unexpectedErrno(e), + }; + break :blk @intCast(res); + }, + .wait_event_source => op.source.native.fd, + .notify_event_source, .close_event_source => 0, + }; +} + +pub inline fn closeReadinessFd(op: anytype, fd: std.posix.fd_t) void { + const needs_close = switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { + .timeout, .link_timeout, .child_exit => true, + .fsync, .read, .write => false, + .socket, .accept, .connect, .recv, .send => false, + .open_at, .close_file, .close_dir, .close_socket => false, + .cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => false, + .notify_event_source, .wait_event_source, .close_event_source => false, + }; + if (needs_close) std.posix.close(fd); +} diff --git a/src/aio/common/types.zig b/src/aio/common/types.zig new file mode 100644 index 0000000..97d5a41 --- /dev/null +++ b/src/aio/common/types.zig @@ -0,0 +1,121 @@ +const std = @import("std"); + +pub fn FixedArrayList(T: type, SZ: type) type { + return struct { + items: []T, + len: SZ = 0, + + pub const Error = error{ + OutOfMemory, + }; + + pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { + return .{ .items = try allocator.alloc(T, n) }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + allocator.free(self.items); + self.* = undefined; + } + + pub fn add(self: *@This(), item: T) Error!void { + if (self.len >= self.items.len) return error.OutOfMemory; + self.items[self.len] = item; + self.len += 1; + } + + pub fn reset(self: *@This()) void { + self.len = 0; + } + }; +} + +pub fn Pool(T: type, SZ: type) type { + return struct { + pub const Node = union(enum) { free: ?SZ, used: T }; + nodes: []Node, + free: ?SZ = null, + num_free: SZ = 0, + num_used: SZ = 0, + + pub const Error = error{ + OutOfMemory, + }; + + pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { + return .{ .nodes = try allocator.alloc(Node, n) }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + allocator.free(self.nodes); + self.* = undefined; + } + + pub fn empty(self: *@This()) bool { + return self.num_used == self.num_free; + } + + pub fn next(self: *@This()) ?SZ { + if (self.free) |fslot| return fslot; + if (self.num_used >= self.nodes.len) return null; + return self.num_used; + } + + pub fn add(self: *@This(), item: T) Error!SZ { + if (self.free) |fslot| { + self.free = self.nodes[fslot].free; + self.nodes[fslot] = .{ .used = item }; + self.num_free -= 1; + return fslot; + } + if (self.num_used >= self.nodes.len) return error.OutOfMemory; + self.nodes[self.num_used] = .{ .used = item }; + defer self.num_used += 1; + return self.num_used; + } + + pub fn remove(self: *@This(), slot: SZ) void { + if (self.free) |fslot| { + self.nodes[slot] = .{ .free = fslot }; + } else { + self.nodes[slot] = .{ .free = null }; + } + self.free = slot; + self.num_free += 1; + } + + pub fn get(self: *@This(), slot: SZ) *T { + return &self.nodes[slot].used; + } + + pub fn reset(self: *@This()) void { + self.free = null; + self.num_free = 0; + self.num_used = 0; + } + + pub const Iterator = struct { + items: []Node, + index: SZ = 0, + + pub const Entry = struct { + k: SZ, + v: *T, + }; + + pub fn next(self: *@This()) ?Entry { + while (self.index < self.items.len) { + defer self.index += 1; + if (self.items[self.index] == .used) { + return .{ .k = self.index, .v = &self.items[self.index].used }; + } + } + return null; + } + }; + + pub fn iterator(self: *@This()) Iterator { + return .{ .items = self.nodes[0..self.num_used] }; + } + }; +} diff --git a/src/aio/fallback.zig b/src/aio/fallback.zig new file mode 100644 index 0000000..0c1d659 --- /dev/null +++ b/src/aio/fallback.zig @@ -0,0 +1,382 @@ +const std = @import("std"); +const aio = @import("../aio.zig"); +const Operation = @import("ops.zig").Operation; +const Pool = @import("common/types.zig").Pool; +const FixedArrayList = @import("common/types.zig").FixedArrayList; +const posix = @import("common/posix.zig"); + +// TODO: timers have to be set when the operation actually starts + +fn debug(comptime fmt: []const u8, args: anytype) void { + if (@import("builtin").is_test) { + std.debug.print("fallback: " ++ fmt ++ "\n", args); + } else { + if (comptime !aio.options.debug) return; + const log = std.log.scoped(.fallback); + log.debug(fmt, args); + } +} + +pub const EventSource = @import("common/eventfd.zig"); + +const Queue = struct { + const Result = struct { failure: Operation.Error, id: u16 }; + ops: Pool(Operation.Union, u16), + next: []u16, + rfd: []std.posix.fd_t, + link_lock: std.DynamicBitSetUnmanaged, + started: std.DynamicBitSetUnmanaged, + pending: std.DynamicBitSetUnmanaged, + finished: FixedArrayList(Result, u16), + finished_mutex: std.Thread.Mutex = .{}, + + pub fn init(allocator: std.mem.Allocator, n: u16) !@This() { + var ops = try Pool(Operation.Union, u16).init(allocator, n); + errdefer ops.deinit(allocator); + const next = try allocator.alloc(u16, n); + errdefer allocator.free(next); + const rfd = try allocator.alloc(std.posix.fd_t, n); + errdefer allocator.free(rfd); + var link_lock = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); + errdefer link_lock.deinit(allocator); + var started = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); + errdefer started.deinit(allocator); + var pending = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); + errdefer pending.deinit(allocator); + var finished = try FixedArrayList(Result, u16).init(allocator, n); + errdefer finished.deinit(allocator); + return .{ + .ops = ops, + .next = next, + .rfd = rfd, + .link_lock = link_lock, + .started = started, + .pending = pending, + .finished = finished, + }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + var iter = self.ops.iterator(); + while (iter.next()) |e| if (self.rfd[e.k] != 0) { + switch (e.v.*) { + inline else => |*op| posix.closeReadinessFd(op, self.rfd[e.k]), + } + }; + self.ops.deinit(allocator); + allocator.free(self.next); + self.link_lock.deinit(allocator); + self.started.deinit(allocator); + self.pending.deinit(allocator); + allocator.free(self.rfd); + self.finished.deinit(allocator); + self.* = undefined; + } + + pub fn add(self: *@This(), uop: Operation.Union, linked_to: ?u16, rfd: std.posix.fd_t) !u16 { + const id = try self.ops.add(uop); + if (linked_to) |ln| { + self.next[ln] = id; + self.link_lock.set(id); + } else { + self.link_lock.unset(id); + } + // to account a mistake where link is set without a next op + self.next[id] = id; + self.rfd[id] = rfd; + self.started.unset(id); + self.pending.unset(id); + switch (uop) { + inline else => |*op| { + if (@hasField(@TypeOf(op.*), "out_id")) { + if (op.out_id) |p_id| p_id.* = @enumFromInt(id); + } + if (op.out_error) |out_error| out_error.* = error.Success; + }, + } + return id; + } + + pub fn remove(self: *@This(), id: u16) void { + if (self.rfd[id] != 0) { + switch (self.ops.nodes[id].used) { + inline else => |*op| posix.closeReadinessFd(op, self.rfd[id]), + } + self.rfd[id] = 0; + } + self.next[id] = id; + self.ops.remove(id); + } + + pub const FinishLock = enum { lock, no_lock }; + + pub fn finish(self: *@This(), res: Result, lock: FinishLock) void { + switch (lock) { + .lock => { + self.finished_mutex.lock(); + defer self.finished_mutex.unlock(); + for (self.finished.items[0..self.finished.len]) |*i| if (i.id == res.id) { + i.* = res; + return; + }; + self.finished.add(res) catch unreachable; + }, + .no_lock => { + for (self.finished.items[0..self.finished.len]) |*i| if (i.id == res.id) { + i.* = res; + return; + }; + self.finished.add(res) catch unreachable; + }, + } + } +}; + +efd: std.posix.fd_t, +tpool: *std.Thread.Pool, +sq: Queue, +pfd: FixedArrayList(std.posix.pollfd, u32), +prev_id: ?u16 = null, // for linking operations + +pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { + const efd = try std.posix.eventfd(0, std.os.linux.EFD.NONBLOCK | std.os.linux.EFD.CLOEXEC); + errdefer std.posix.close(efd); + var tpool = try allocator.create(std.Thread.Pool); + tpool.init(.{ .allocator = allocator, .n_jobs = aio.options.num_threads }) catch |err| return switch (err) { + error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources, + else => |e| e, + }; + var sq = try Queue.init(allocator, n); + errdefer sq.deinit(allocator); + var pfd = try FixedArrayList(std.posix.pollfd, u32).init(allocator, n + 1); + errdefer pfd.deinit(allocator); + return .{ .efd = efd, .tpool = tpool, .sq = sq, .pfd = pfd }; +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.tpool.deinit(); + allocator.destroy(self.tpool); + self.sq.deinit(allocator); + self.pfd.deinit(allocator); + std.posix.close(self.efd); + self.* = undefined; +} + +pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.Error!void { + var ids: std.BoundedArray(u16, len) = .{}; + errdefer for (ids.constSlice()) |id| self.sq.remove(id); + inline for (&work.ops) |*op| { + const tag = @tagName(comptime Operation.tagFromPayloadType(@TypeOf(op.*))); + const uop = @unionInit(Operation.Union, tag, op.*); + const id = try self.sq.add(uop, self.prev_id, try posix.readinessFd(op)); + debug("queue: {}: {}", .{ id, std.meta.activeTag(uop) }); + ids.append(id) catch unreachable; + if (op.link != .unlinked) self.prev_id = id else self.prev_id = null; + } +} + +fn finish(self: *@This(), id: u16, failure: Operation.Error, lock: Queue.FinishLock) void { + self.sq.started.unset(id); + self.sq.pending.unset(id); + self.sq.link_lock.set(id); // prevent restarting + self.sq.finish(.{ .id = id, .failure = failure }, lock); + _ = std.posix.write(self.efd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; +} + +fn executor(self: *@This(), uop: Operation.Union, id: u16) void { + var failure: Operation.Error = error.Success; + defer self.finish(id, failure, .lock); + switch (uop) { + inline else => |*op| { + posix.perform(op) catch |op_err| { + failure = op_err; + }; + }, + } +} + +fn cancel(self: *@This(), id: u16, lock: Queue.FinishLock) enum { in_progress, not_found, ok } { + if (self.sq.started.isSet(id) and !self.sq.pending.isSet(id)) { + return .in_progress; + } + if (self.sq.ops.nodes[id] != .used) { + return .not_found; + } + // collect the result later + self.finish(id, error.OperationCanceled, lock); + return .ok; +} + +fn start(self: *@This(), id: u16) !void { + if (self.sq.link_lock.isSet(id)) return; // previous op hasn't finished yet + + self.sq.started.set(id); + if (self.sq.rfd[id] == 0 or self.sq.pending.isSet(id)) { + if (self.sq.next[id] != id) { + debug("perform: {}: {} => {}", .{ id, std.meta.activeTag(self.sq.ops.nodes[id].used), self.sq.next[id] }); + } else { + debug("perform: {}: {}", .{ id, std.meta.activeTag(self.sq.ops.nodes[id].used) }); + } + switch (self.sq.ops.nodes[id].used) { + .cancel => |op| { + switch (self.cancel(@intCast(@intFromEnum(op.id)), .lock)) { + .ok => self.finish(id, error.Success, .lock), + .in_progress => self.finish(id, error.InProgress, .lock), + .not_found => self.finish(id, error.NotFound, .lock), + } + }, + .timeout => self.finish(id, error.Success, .lock), + .link_timeout => { + var iter = self.sq.ops.iterator(); + const res = blk: { + while (iter.next()) |e| { + if (e.k != id and self.sq.next[e.k] == id) { + const res = self.cancel(e.k, .lock); + // invalidate child's next since we expired first + if (res == .ok) self.sq.next[e.k] = e.k; + break :blk res; + } + } + break :blk .not_found; + }; + if (res == .ok) { + self.finish(id, error.Expired, .lock); + } else { + self.finish(id, error.Success, .lock); + } + }, + else => { + self.sq.pending.unset(id); + self.tpool.spawn(executor, .{ self, self.sq.ops.nodes[id].used, id }) catch return error.SystemResources; + }, + } + } else { + // pending for readiness, starts later + if (self.sq.next[id] != id) { + debug("pending: {}: {} => {}", .{ id, std.meta.activeTag(self.sq.ops.nodes[id].used), self.sq.next[id] }); + } else { + debug("pending: {}: {}", .{ id, std.meta.activeTag(self.sq.ops.nodes[id].used) }); + } + self.sq.pending.set(id); + } + + // we need to start linked timeout immediately as well if there's one + if (self.sq.next[id] != id and self.sq.ops.nodes[self.sq.next[id]].used == .link_timeout) { + self.sq.link_lock.unset(self.sq.next[id]); + } +} + +fn submit(self: *@This()) !bool { + if (self.sq.ops.empty()) return false; + defer self.prev_id = null; + self.pfd.add(.{ .fd = self.efd, .events = std.posix.POLL.IN, .revents = 0 }) catch unreachable; + var iter = self.sq.ops.iterator(); + while (iter.next()) |e| { + if (!self.sq.started.isSet(e.k)) { + try self.start(e.k); + } + if (self.sq.pending.isSet(e.k)) { + std.debug.assert(self.sq.rfd[e.k] != 0); + self.pfd.add(.{ + .fd = self.sq.rfd[e.k], + .events = std.posix.POLL.IN, + .revents = 0, + }) catch unreachable; + } + } + return true; +} + +pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode) aio.Error!aio.CompletionResult { + if (!try self.submit()) return .{}; + defer self.pfd.reset(); + + // TODO: use kqueue and epoll instead + _ = std.posix.poll(self.pfd.items[0..self.pfd.len], if (mode == .blocking) -1 else 0) catch |err| return switch (err) { + error.NetworkSubsystemFailed => unreachable, + else => |e| e, + }; + + var num_errors: u16 = 0; + var num_completed: u16 = 0; + for (self.pfd.items[0..self.pfd.len]) |pfd| { + if (pfd.revents & std.posix.POLL.IN == 0) continue; + if (pfd.fd == self.efd) { + var trash: u64 align(1) = undefined; + _ = std.posix.read(self.efd, std.mem.asBytes(&trash)) catch {}; + self.sq.finished_mutex.lock(); + defer self.sq.finished_mutex.unlock(); + defer self.sq.finished.reset(); + var last_len: u16 = 0; + while (last_len != self.sq.finished.len) { + const start_idx = last_len; + last_len = self.sq.finished.len; + for (self.sq.finished.items[start_idx..self.sq.finished.len]) |res| { + if (res.failure != error.Success) { + debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.sq.ops.nodes[res.id].used), res.failure }); + } else { + debug("complete: {}: {} [OK]", .{ res.id, std.meta.activeTag(self.sq.ops.nodes[res.id].used) }); + } + defer self.sq.remove(res.id); + if (self.sq.ops.nodes[res.id].used == .link_timeout and res.failure == error.OperationCanceled) { + // special case + } else { + num_errors += @intFromBool(res.failure != error.Success); + } + switch (self.sq.ops.nodes[res.id].used) { + inline else => |*op| { + switch (op.counter) { + .dec => |c| c.* -= 1, + .inc => |c| c.* += 1, + .nop => {}, + } + if (@hasField(@TypeOf(op.*), "out_id")) { + if (op.out_error) |err| err.* = @errorCast(res.failure); + } + if (op.link != .unlinked and self.sq.next[res.id] != res.id) { + if (self.sq.ops.nodes[self.sq.next[res.id]].used == .link_timeout) { + switch (op.link) { + .unlinked => unreachable, + .soft => std.debug.assert(self.cancel(self.sq.next[res.id], .no_lock) == .ok), + .hard => self.finish(self.sq.next[res.id], error.Success, .no_lock), + } + } else if (res.failure != error.Success and op.link == .soft) { + _ = self.cancel(self.sq.next[res.id], .no_lock); + } else { + self.sq.link_lock.unset(self.sq.next[res.id]); + } + } + }, + } + } + } + num_completed = self.sq.finished.len; + } else { + var iter = self.sq.ops.iterator(); + while (iter.next()) |e| if (pfd.fd == self.sq.rfd[e.k]) { + // canceled + if (!self.sq.pending.isSet(e.k)) continue; + // reset started bit, the operation will spawn next cycle + self.sq.started.unset(e.k); + }; + } + } + + return .{ .num_completed = num_completed, .num_errors = num_errors }; +} + +pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 { + var mem: [len * 1024]u8 = undefined; + var fba = std.heap.FixedBufferAllocator.init(&mem); + var wrk = try init(fba.allocator(), len); + defer wrk.deinit(fba.allocator()); + try wrk.queue(len, work); + var n: u16 = len; + var num_errors: u16 = 0; + while (n > 0) { + const res = try wrk.complete(.blocking); + n -= res.num_completed; + num_errors += res.num_errors; + } + return num_errors; +} diff --git a/src/aio/linux.zig b/src/aio/io_uring.zig similarity index 74% rename from src/aio/linux.zig rename to src/aio/io_uring.zig index e6f7987..ef4329b 100644 --- a/src/aio/linux.zig +++ b/src/aio/io_uring.zig @@ -1,41 +1,26 @@ const std = @import("std"); const aio = @import("../aio.zig"); const Operation = @import("ops.zig").Operation; +const Pool = @import("common/types.zig").Pool; +const posix = @import("common/posix.zig"); -pub const EventSource = struct { - fd: std.posix.fd_t, - - pub inline fn init() !@This() { - return .{ - .fd = std.posix.eventfd(0, std.os.linux.EFD.CLOEXEC) catch |err| return switch (err) { - error.SystemResources => error.SystemResources, - error.ProcessFdQuotaExceeded => error.ProcessQuotaExceeded, - error.SystemFdQuotaExceeded => error.SystemQuotaExceeded, - error.Unexpected => error.Unexpected, - }, - }; - } - - pub inline fn deinit(self: *@This()) void { - std.posix.close(self.fd); - self.* = undefined; - } - - pub inline fn notify(self: *@This()) void { - _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; +fn debug(comptime fmt: []const u8, args: anytype) void { + if (@import("builtin").is_test) { + std.debug.print("io_uring: " ++ fmt ++ "\n", args); + } else { + if (comptime !aio.options.debug) return; + const log = std.log.scoped(.io_uring); + log.debug(fmt, args); } +} - pub inline fn wait(self: *@This()) void { - var v: u64 = undefined; - _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable; - } -}; +pub const EventSource = @import("common/eventfd.zig"); io: std.os.linux.IoUring, ops: Pool(Operation.Union, u16), -pub fn init(allocator: std.mem.Allocator, n: u16) aio.InitError!@This() { - const n2 = std.math.ceilPowerOfTwo(u16, n) catch return error.SystemQuotaExceeded; +pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { + const n2 = std.math.ceilPowerOfTwo(u16, n) catch unreachable; var io = try uring_init(n2); errdefer io.deinit(); const ops = try Pool(Operation.Union, u16).init(allocator, n2); @@ -49,14 +34,14 @@ pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { self.* = undefined; } -inline fn queueOperation(self: *@This(), op: anytype) aio.QueueError!u16 { +inline fn queueOperation(self: *@This(), op: anytype) aio.Error!u16 { const n = self.ops.next() orelse return error.OutOfMemory; try uring_queue(&self.io, op, n); const tag = @tagName(comptime Operation.tagFromPayloadType(@TypeOf(op.*))); return self.ops.add(@unionInit(Operation.Union, tag, op.*)) catch unreachable; } -pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.QueueError!void { +pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.Error!void { if (comptime len == 1) { _ = try self.queueOperation(&work.ops[0]); } else { @@ -66,20 +51,22 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.QueueError!vo } } -const NOP = std.math.maxInt(usize); +pub const NOP = std.math.maxInt(u64); -pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode) aio.CompletionError!aio.CompletionResult { - if ((!self.ops.empty() or self.io.sq_ready() > 0) and mode == .nonblocking) { +pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode) aio.Error!aio.CompletionResult { + if (self.ops.empty() and self.io.sq_ready() == 0) return .{}; + if (mode == .nonblocking) { _ = self.io.nop(NOP) catch |err| return switch (err) { error.SubmissionQueueFull => .{}, }; } - if (try uring_submit(&self.io) == 0) return .{}; + _ = try uring_submit(&self.io); var result: aio.CompletionResult = .{}; var cqes: [64]std.os.linux.io_uring_cqe = undefined; const n = try uring_copy_cqes(&self.io, &cqes, 1); for (cqes[0..n]) |*cqe| { if (cqe.user_data == NOP) continue; + defer self.ops.remove(@intCast(cqe.user_data)); switch (self.ops.get(@intCast(cqe.user_data)).*) { inline else => |op| uring_handle_completion(&op, cqe) catch { result.num_errors += 1; @@ -90,8 +77,8 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode) aio.Completion return result; } -pub fn immediate(comptime len: u16, work: anytype) aio.ImmediateError!u16 { - var io = try uring_init(std.math.ceilPowerOfTwo(u16, len) catch return error.SystemQuotaExceeded); +pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 { + var io = try uring_init(std.math.ceilPowerOfTwo(u16, len) catch unreachable); defer io.deinit(); inline for (&work.ops, 0..) |*op, idx| try uring_queue(&io, op, idx); var num = try uring_submit(&io); @@ -111,62 +98,32 @@ pub fn immediate(comptime len: u16, work: anytype) aio.ImmediateError!u16 { return num_errors; } -inline fn uring_init(n: u16) aio.InitError!std.os.linux.IoUring { +inline fn uring_init(n: u16) aio.Error!std.os.linux.IoUring { return std.os.linux.IoUring.init(n, 0) catch |err| switch (err) { - error.PermissionDenied, error.SystemResources, error.SystemOutdated => |e| e, - error.ProcessFdQuotaExceeded => error.ProcessQuotaExceeded, - error.SystemFdQuotaExceeded => error.SystemQuotaExceeded, + error.PermissionDenied, + error.SystemResources, + error.SystemOutdated, + error.ProcessFdQuotaExceeded, + error.SystemFdQuotaExceeded, + => |e| e, else => error.Unexpected, }; } -fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O { - var os_flags: std.posix.O = .{ - .ACCMODE = switch (flags.mode) { - .read_only => .RDONLY, - .write_only => .WRONLY, - .read_write => .RDWR, - }, - }; - if (@hasField(std.posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; - if (@hasField(std.posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; - if (@hasField(std.posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty; - - // Use the O locking flags if the os supports them to acquire the lock - // atomically. - const has_flock_open_flags = @hasField(std.posix.O, "EXLOCK"); - if (has_flock_open_flags) { - // Note that the NONBLOCK flag is removed after the openat() call - // is successful. - switch (flags.lock) { - .none => {}, - .shared => { - os_flags.SHLOCK = true; - os_flags.NONBLOCK = flags.lock_nonblocking; - }, - .exclusive => { - os_flags.EXLOCK = true; - os_flags.NONBLOCK = flags.lock_nonblocking; - }, - } - } - return os_flags; -} - -inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) aio.QueueError!void { +inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) aio.Error!void { + debug("queue: {}: {}", .{ user_data, comptime Operation.tagFromPayloadType(@TypeOf(op.*)) }); const Trash = struct { var u_64: u64 align(1) = undefined; }; - const RENAME_NOREPLACE = 1 << 0; var sqe = switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { .fsync => try io.fsync(user_data, op.file.handle, 0), .read => try io.read(user_data, op.file.handle, .{ .buffer = op.buffer }, op.offset), .write => try io.write(user_data, op.file.handle, op.buffer, op.offset), - .accept => try io.accept(user_data, op.socket, @ptrCast(op.addr), op.inout_addrlen, 0), - .connect => try io.connect(user_data, op.socket, @ptrCast(op.addr), op.addrlen), + .accept => try io.accept(user_data, op.socket, op.addr, op.inout_addrlen, 0), + .connect => try io.connect(user_data, op.socket, op.addr, op.addrlen), .recv => try io.recv(user_data, op.socket, .{ .buffer = op.buffer }, 0), .send => try io.send(user_data, op.socket, op.buffer, 0), - .open_at => try io.openat(user_data, op.dir.fd, op.path, convertOpenFlags(op.flags), 0), + .open_at => try io.openat(user_data, op.dir.fd, op.path, posix.convertOpenFlags(op.flags), 0), .close_file => try io.close(user_data, op.file.handle), .close_dir => try io.close(user_data, op.dir.fd), .timeout => blk: { @@ -181,11 +138,10 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai .tv_sec = @intCast(op.ns / std.time.ns_per_s), .tv_nsec = @intCast(op.ns % std.time.ns_per_s), }; - if (op.out_expired) |expired| expired.* = false; break :blk try io.link_timeout(user_data, &ts, 0); }, .cancel => try io.cancel(user_data, @intFromEnum(op.id), 0), - .rename_at => try io.renameat(user_data, op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, RENAME_NOREPLACE), + .rename_at => try io.renameat(user_data, op.old_dir.fd, op.old_path, op.new_dir.fd, op.new_path, posix.RENAME_NOREPLACE), .unlink_at => try io.unlinkat(user_data, op.dir.fd, op.path, 0), .mkdir_at => try io.mkdirat(user_data, op.dir.fd, op.path, op.mode), .symlink_at => try io.symlinkat(user_data, op.target, op.dir.fd, op.link_path), @@ -196,14 +152,18 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai .wait_event_source => try io.read(user_data, op.source.native.fd, .{ .buffer = std.mem.asBytes(&Trash.u_64) }, 0), .close_event_source => try io.close(user_data, op.source.native.fd), }; - if (op.link_next) sqe.flags |= std.os.linux.IOSQE_IO_LINK; + switch (op.link) { + .unlinked => {}, + .soft => sqe.flags |= std.os.linux.IOSQE_IO_LINK, + .hard => sqe.flags |= std.os.linux.IOSQE_IO_HARDLINK, + } if (@hasField(@TypeOf(op.*), "out_id")) { if (op.out_id) |id| id.* = @enumFromInt(user_data); } if (op.out_error) |out_error| out_error.* = error.Success; } -inline fn uring_submit(io: *std.os.linux.IoUring) aio.CompletionError!u16 { +inline fn uring_submit(io: *std.os.linux.IoUring) aio.Error!u16 { while (true) { const n = io.submit() catch |err| switch (err) { error.FileDescriptorInvalid => unreachable, @@ -211,14 +171,15 @@ inline fn uring_submit(io: *std.os.linux.IoUring) aio.CompletionError!u16 { error.BufferInvalid => unreachable, error.OpcodeNotSupported => unreachable, error.RingShuttingDown => unreachable, + error.SubmissionQueueEntryInvalid => unreachable, error.SignalInterrupt => continue, - error.CompletionQueueOvercommitted, error.SubmissionQueueEntryInvalid, error.Unexpected, error.SystemResources => |e| return e, + error.CompletionQueueOvercommitted, error.Unexpected, error.SystemResources => |e| return e, }; return @intCast(n); } } -inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, len: u16) aio.CompletionError!u16 { +inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, len: u16) aio.Error!u16 { while (true) { const n = io.copy_cqes(cqes, len) catch |err| switch (err) { error.FileDescriptorInvalid => unreachable, @@ -226,25 +187,15 @@ inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uri error.BufferInvalid => unreachable, error.OpcodeNotSupported => unreachable, error.RingShuttingDown => unreachable, + error.SubmissionQueueEntryInvalid => unreachable, error.SignalInterrupt => continue, - error.CompletionQueueOvercommitted, error.SubmissionQueueEntryInvalid, error.Unexpected, error.SystemResources => |e| return e, + error.CompletionQueueOvercommitted, error.Unexpected, error.SystemResources => |e| return e, }; return @intCast(n); } unreachable; } -fn statusToTerm(status: u32) std.process.Child.Term { - return if (std.posix.W.IFEXITED(status)) - .{ .Exited = std.posix.W.EXITSTATUS(status) } - else if (std.posix.W.IFSIGNALED(status)) - .{ .Signal = std.posix.W.TERMSIG(status) } - else if (std.posix.W.IFSTOPPED(status)) - .{ .Stopped = std.posix.W.STOPSIG(status) } - else - .{ .Unknown = status }; -} - inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) !void { switch (op.counter) { .dec => |c| c.* -= 1, @@ -398,12 +349,8 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) }, .link_timeout => switch (err) { .SUCCESS, .INTR, .INVAL, .AGAIN => unreachable, - .TIME => blk: { - if (op.out_expired) |expired| expired.* = true; - break :blk error.Success; - }, - .ALREADY => error.Success, - .CANCELED => error.OperationCanceled, + .TIME => error.Expired, + .ALREADY, .CANCELED => error.OperationCanceled, else => unreachable, }, .cancel => switch (err) { @@ -510,10 +457,21 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) else => std.posix.unexpectedErrno(err), }, }; + if (op.out_error) |out_error| out_error.* = res; - if (res != error.Success) return error.OperationFailed; + + if (res != error.Success) { + if ((comptime Operation.tagFromPayloadType(@TypeOf(op.*)) == .link_timeout) and res == error.OperationCanceled) { + // special case + } else { + debug("complete: {}: {} [FAIL] {}", .{ cqe.user_data, comptime Operation.tagFromPayloadType(@TypeOf(op.*)), res }); + return error.OperationFailed; + } + } } + debug("complete: {}: {} [OK]", .{ cqe.user_data, comptime Operation.tagFromPayloadType(@TypeOf(op.*)) }); + switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { .fsync => {}, .read => op.out_read.* = @intCast(cqe.res), @@ -533,87 +491,8 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe) .cancel => {}, .rename_at, .unlink_at, .mkdir_at, .symlink_at => {}, .child_exit => if (op.out_term) |term| { - term.* = statusToTerm(@intCast(op._.fields.common.second.sigchld.status)); + term.* = posix.statusToTerm(@intCast(op._.fields.common.second.sigchld.status)); }, .socket => op.out_socket.* = cqe.res, } } - -pub fn Pool(T: type, SZ: type) type { - return struct { - pub const Node = union(enum) { free: ?SZ, used: T }; - nodes: []Node, - free: ?SZ = null, - num_free: SZ = 0, - num_used: SZ = 0, - - pub const Error = error{ - OutOfMemory, - }; - - pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { - return .{ .nodes = try allocator.alloc(Node, n) }; - } - - pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - allocator.free(self.nodes); - self.* = undefined; - } - - pub fn empty(self: *@This()) bool { - return self.num_used == self.num_free; - } - - pub fn next(self: *@This()) ?SZ { - if (self.free) |fslot| return fslot; - if (self.num_used >= self.nodes.len) return null; - return self.num_used; - } - - pub fn add(self: *@This(), item: T) Error!SZ { - if (self.free) |fslot| { - self.free = self.nodes[fslot].free; - self.nodes[fslot] = .{ .used = item }; - self.num_free -= 1; - return fslot; - } - if (self.num_used >= self.nodes.len) return error.OutOfMemory; - self.nodes[self.num_used] = .{ .used = item }; - defer self.num_used += 1; - return self.num_used; - } - - pub fn remove(self: *@This(), slot: SZ) void { - if (self.free) |fslot| { - self.nodes[slot] = .{ .free = fslot }; - } else { - self.nodes[slot] = .{ .free = null }; - } - self.free = slot; - self.num_free += 1; - } - - pub fn get(self: *@This(), slot: SZ) *T { - return &self.nodes[slot].used; - } - - pub const Iterator = struct { - items: []Node, - index: SZ = 0, - - pub fn next(self: *@This()) *T { - while (self.index < self.items.len) { - defer self.index += 1; - if (self.items[self.index] == .used) { - return &self.items[self.index].used; - } - } - return null; - } - }; - - pub fn iterator(self: *@This()) Iterator { - return .{ .items = self.nodes[0..self.num_used] }; - } - }; -} diff --git a/src/aio/ops.zig b/src/aio/ops.zig index 42a4a69..48615f5 100644 --- a/src/aio/ops.zig +++ b/src/aio/ops.zig @@ -6,6 +6,14 @@ const aio = @import("../aio.zig"); pub const Id = enum(usize) { _ }; +pub const Link = enum { + unlinked, + /// If the operation fails the next operation in the chain will fail as well + soft, + /// If the operation fails, the failure is ignored and next operation is started regardless + hard, +}; + // Counter that either increases or decreases a value in a given address // Reserved when using the coroutines API const Counter = union(enum) { @@ -26,12 +34,12 @@ pub const Fsync = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.File.read pub const Read = struct { - pub const Error = std.fs.File.ReadError || SharedError; + pub const Error = std.posix.PReadError || SharedError; file: std.fs.File, buffer: []u8, offset: u64 = 0, @@ -39,12 +47,12 @@ pub const Read = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.File.write pub const Write = struct { - pub const Error = std.fs.File.WriteError || SharedError; + pub const Error = std.posix.PWriteError || SharedError; file: std.fs.File, buffer: []const u8, offset: u64 = 0, @@ -52,7 +60,7 @@ pub const Write = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.posix.accept @@ -65,7 +73,7 @@ pub const Accept = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.posix.connect @@ -77,7 +85,7 @@ pub const Connect = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.posix.recv @@ -89,7 +97,7 @@ pub const Recv = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.posix.send @@ -101,7 +109,7 @@ pub const Send = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; // TODO: recvmsg, sendmsg @@ -116,7 +124,7 @@ pub const OpenAt = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.File.close @@ -126,7 +134,7 @@ pub const CloseFile = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.Dir.Close @@ -136,7 +144,7 @@ pub const CloseDir = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.time.Timer.start @@ -146,20 +154,19 @@ pub const Timeout = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// Timeout linked to a operation -/// This must be linked last and the operation before must have set `link_next` to `true` +/// This must be linked last and the operation before must have set `link` to either `soft` or `hard` /// If the operation finishes before the timeout the timeout will be canceled pub const LinkTimeout = struct { - pub const Error = SharedError; + pub const Error = error{Expired} || SharedError; ns: u128, - out_expired: ?*bool = null, out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// Cancel a operation @@ -168,7 +175,7 @@ pub const Cancel = struct { id: Id, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.rename @@ -181,7 +188,7 @@ pub const RenameAt = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.Dir.deleteFile @@ -192,7 +199,7 @@ pub const UnlinkAt = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.Dir.makeDir @@ -204,7 +211,7 @@ pub const MkDirAt = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.fs.Dir.symlink @@ -216,7 +223,7 @@ pub const SymlinkAt = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; // TODO: linkat @@ -227,13 +234,13 @@ pub const ChildExit = struct { child: std.process.Child.Id, out_term: ?*std.process.Child.Term = null, _: switch (builtin.target.os.tag) { - .linux => std.os.linux.siginfo_t, - else => @compileError("unsupported os"), + .windows => @compileError("unsupported os"), + else => std.posix.siginfo_t, } = undefined, out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.posix.socket @@ -249,7 +256,7 @@ pub const Socket = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; /// std.posix.close @@ -259,7 +266,7 @@ pub const CloseSocket = struct { out_id: ?*Id = null, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; pub const NotifyEventSource = struct { @@ -267,7 +274,7 @@ pub const NotifyEventSource = struct { source: aio.EventSource, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; pub const WaitEventSource = struct { @@ -275,7 +282,7 @@ pub const WaitEventSource = struct { source: aio.EventSource, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; pub const CloseEventSource = struct { @@ -283,7 +290,7 @@ pub const CloseEventSource = struct { source: aio.EventSource, out_error: ?*Error = null, counter: Counter = .nop, - link_next: bool = false, + link: Link = .unlinked, }; pub const Operation = enum { @@ -363,4 +370,10 @@ pub const Operation = enum { }, }); }; + + pub const Error = blk: { + var set = error{}; + for (Operation.map.values) |v| set = set || v.Error; + break :blk set; + }; }; diff --git a/src/coro.zig b/src/coro.zig index d3af3f2..73c9936 100644 --- a/src/coro.zig +++ b/src/coro.zig @@ -29,13 +29,17 @@ fn defaultErrorHandler(err: anyerror) void { } fn debug(comptime fmt: []const u8, args: anytype) void { - if (comptime !options.debug) return; - const log = std.log.scoped(.coro); - log.debug(fmt, args); + if (@import("builtin").is_test) { + std.debug.print("coro: " ++ fmt ++ "\n", args); + } else { + if (comptime !options.debug) return; + const log = std.log.scoped(.coro); + log.debug(fmt, args); + } } pub const io = struct { - inline fn privateComplete(operations: anytype, yield_state: YieldState) aio.ImmediateError!u16 { + inline fn privateComplete(operations: anytype, yield_state: YieldState) aio.Error!u16 { if (Fiber.current()) |fiber| { var task: *TaskState = @ptrFromInt(fiber.getUserDataPtr().*); @@ -82,20 +86,20 @@ pub const io = struct { /// The IO operations can be cancelled by calling `wakeup` /// For error handling you must check the `out_error` field in the operation /// Returns the number of errors occured, 0 if there were no errors - pub inline fn complete(operations: anytype) aio.ImmediateError!u16 { + pub inline fn complete(operations: anytype) aio.Error!u16 { return privateComplete(operations, .io); } /// Completes a list of operations immediately, blocks until complete /// The IO operations can be cancelled by calling `wakeupFromIo`, or doing `aio.Cancel` /// Returns `error.SomeOperationFailed` if any operation failed - pub inline fn multi(operations: anytype) (aio.ImmediateError || error{SomeOperationFailed})!void { + pub inline fn multi(operations: anytype) (aio.Error || error{SomeOperationFailed})!void { if (try complete(operations) > 0) return error.SomeOperationFailed; } /// Completes a single operation immediately, blocks the coroutine until complete /// The IO operation can be cancelled by calling `wakeupFromIo`, or doing `aio.Cancel` - pub inline fn single(operation: anytype) (aio.ImmediateError || @TypeOf(operation).Error)!void { + pub inline fn single(operation: anytype) (aio.Error || @TypeOf(operation).Error)!void { var op: @TypeOf(operation) = operation; var err: @TypeOf(operation).Error = error.Success; op.out_error = &err; @@ -140,6 +144,14 @@ const YieldState = enum(u8) { io_waiting_thread, // cannot be canceled io_cancel, _, // fields after are reserved for custom use + + pub fn format(self: @This(), comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { + if (@intFromEnum(self) < std.meta.fields(@This()).len) { + try writer.writeAll(@tagName(self)); + } else { + try writer.print("custom {}", .{@intFromEnum(self)}); + } + } }; inline fn privateYield(state: YieldState) void { @@ -156,7 +168,7 @@ inline fn privateYield(state: YieldState) void { inline fn privateWakeup(task: *TaskState, state: YieldState) void { if (task.yield_state != state) return; - debug("waking up from yield: {}", .{task}); + debug("waking up: {}", .{task}); task.yield_state = .not_yielding; task.fiber.switchTo(); } @@ -171,9 +183,9 @@ const TaskState = struct { pub fn format(self: @This(), comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { if (self.io_counter > 0) { - try writer.print("{x}: {s}, {} ops left", .{ @intFromPtr(self.fiber), @tagName(self.yield_state), self.io_counter }); + try writer.print("{x}: {}, {} ops left", .{ @intFromPtr(self.fiber), self.yield_state, self.io_counter }); } else { - try writer.print("{x}: {s}", .{ @intFromPtr(self.fiber), @tagName(self.yield_state) }); + try writer.print("{x}: {}", .{ @intFromPtr(self.fiber), self.yield_state }); } } @@ -367,7 +379,7 @@ pub const ThreadPool = struct { SystemQuotaExceeded, Unexpected, SomeOperationFailed, - } || aio.ImmediateError || aio.WaitEventSource.Error; + } || aio.Error || aio.WaitEventSource.Error; fn ReturnType(comptime Func: type) type { const base = @typeInfo(Func).Fn.return_type.?; @@ -385,7 +397,7 @@ pub const ThreadPool = struct { try self.pool.spawn(entrypoint, .{ &source, func, &ret, args }); var wait_err: aio.WaitEventSource.Error = error.Success; if (try io.privateComplete(.{ - aio.WaitEventSource{ .source = source, .link_next = true, .out_error = &wait_err }, + aio.WaitEventSource{ .source = source, .link = .soft, .out_error = &wait_err }, aio.CloseEventSource{ .source = source }, }, .io_waiting_thread) > 0) { if (wait_err != error.Success) { @@ -425,11 +437,11 @@ test "ThreadPool" { try std.testing.expectEqual(task1_done.*, true); } }; + var scheduler = try Scheduler.init(std.testing.allocator, .{}); + defer scheduler.deinit(); var pool: ThreadPool = .{}; defer pool.deinit(); try pool.start(std.testing.allocator, 0); - var scheduler = try Scheduler.init(std.testing.allocator, .{}); - defer scheduler.deinit(); var task1_done: bool = false; const task1 = try scheduler.spawn(Test.task1, .{&task1_done}, .{}); _ = try scheduler.spawn(Test.task2, .{ task1, &pool, &task1_done }, .{});