From f8ee6d23668e3d27d1b042f4e65c73c068865058 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Wed, 19 Jun 2024 21:09:08 +0900 Subject: [PATCH] coro: handle corner case with yieldForCompletition --- src/aio.zig | 4 ++++ src/aio/linux.zig | 5 +++++ src/coro.zig | 30 +++++++++++++++++------------- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/aio.zig b/src/aio.zig index f4643c9..f6f8ddb 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -123,6 +123,10 @@ pub const EventSource = struct { pub inline fn notify(self: *@This()) void { self.native.notify(); } + + pub inline fn wait(self: *@This()) void { + self.native.wait(); + } }; const IO = switch (@import("builtin").target.os.tag) { diff --git a/src/aio/linux.zig b/src/aio/linux.zig index f8e77b6..e6f7987 100644 --- a/src/aio/linux.zig +++ b/src/aio/linux.zig @@ -24,6 +24,11 @@ pub const EventSource = struct { pub inline fn notify(self: *@This()) void { _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; } + + pub inline fn wait(self: *@This()) void { + var v: u64 = undefined; + _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable; + } }; io: std.os.linux.IoUring, diff --git a/src/coro.zig b/src/coro.zig index f42f7b7..d3af3f2 100644 --- a/src/coro.zig +++ b/src/coro.zig @@ -379,21 +379,25 @@ pub const ThreadPool = struct { /// Yield until `func` finishes on another thread pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) ReturnType(@TypeOf(func)) { - if (Fiber.current()) |_| { - var ret: @typeInfo(@TypeOf(func)).Fn.return_type.? = undefined; - var source = try aio.EventSource.init(); - errdefer source.deinit(); - try self.pool.spawn(entrypoint, .{ &source, func, &ret, args }); - if (try io.privateComplete(.{ - aio.WaitEventSource{ .source = source, .link_next = true }, - aio.CloseEventSource{ .source = source }, - }, .io_waiting_thread) > 0) { - return error.SomeOperationFailed; + var ret: @typeInfo(@TypeOf(func)).Fn.return_type.? = undefined; + var source = try aio.EventSource.init(); + errdefer source.deinit(); + try self.pool.spawn(entrypoint, .{ &source, func, &ret, args }); + var wait_err: aio.WaitEventSource.Error = error.Success; + if (try io.privateComplete(.{ + aio.WaitEventSource{ .source = source, .link_next = true, .out_error = &wait_err }, + aio.CloseEventSource{ .source = source }, + }, .io_waiting_thread) > 0) { + if (wait_err != error.Success) { + // it's possible to end up here if aio implementation ran out of resources + // in case of io_uring the application managed to fill up the submission queue + // normally this should not happen, but as to not crash the program do a blocking wait + source.wait(); } - return ret; - } else { - unreachable; // yieldForCompletition can only be used from a task + // close manually + source.deinit(); } + return ret; } };