From 4de5a08487f0fc7f51a501b6646d661b46d3f8e7 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Sun, 23 Jun 2024 04:13:26 +0900 Subject: [PATCH] coro: fix threadpool deadlocks --- src/aio/common/posix.zig | 3 ++- src/coro.zig | 25 ++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/aio/common/posix.zig b/src/aio/common/posix.zig index da25c54..05ebcce 100644 --- a/src/aio/common/posix.zig +++ b/src/aio/common/posix.zig @@ -7,6 +7,7 @@ pub const PIDFD_NONBLOCK = @as(usize, 1 << @bitOffsetOf(std.posix.O, "NONBLOCK") pub const EventSource = struct { fd: std.posix.fd_t, + counter: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), pub inline fn init() !@This() { if (comptime @hasDecl(std.posix.system, "eventfd")) { @@ -33,7 +34,7 @@ pub const EventSource = struct { _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch continue; } else if (comptime @hasDecl(std.posix.system, "kqueue")) { _ = std.posix.kevent(self.fd, &.{.{ - .ident = @intCast(self.fd), + .ident = self.counter.fetchAdd(1, .monotonic), .filter = std.posix.system.EVFILT_USER, .flags = std.posix.system.EV_ADD | std.posix.system.EV_ENABLE | std.posix.system.EV_ONESHOT, .fflags = std.posix.system.NOTE_TRIGGER, diff --git a/src/coro.zig b/src/coro.zig index 4171e8a..6b627cb 100644 --- a/src/coro.zig +++ b/src/coro.zig @@ -422,6 +422,7 @@ test "Wakeup" { pub const ThreadPool = struct { pool: std.Thread.Pool = undefined, source: aio.EventSource = undefined, + num_tasks: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), /// Spin up the pool, `allocator` is used to allocate the tasks /// If `num_jobs` is zero, the thread count for the current CPU is used @@ -437,15 +438,11 @@ pub const ThreadPool = struct { self.* = undefined; } - const Sync = struct { - source: aio.EventSource, - completed: bool = false, - }; - - inline fn entrypoint(sync: *Sync, comptime func: anytype, ret: anytype, args: anytype) void { + inline fn entrypoint(self: *@This(), completed: *bool, comptime func: anytype, ret: anytype, args: anytype) void { ret.* = @call(.auto, func, args); - sync.completed = true; - sync.source.notify(); + completed.* = true; + const n = self.num_tasks.load(.acquire); + for (0..n) |_| self.source.notify(); } const Error = error{SomeOperationFailed} || aio.Error || aio.EventSource.Error || aio.WaitEventSource.Error; @@ -460,18 +457,20 @@ pub const ThreadPool = struct { /// Yield until `func` finishes on another thread pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) ReturnType(@TypeOf(func)) { - var sync: Sync = .{ .source = self.source }; + var completed: bool = false; var ret: @typeInfo(@TypeOf(func)).Fn.return_type.? = undefined; - try self.pool.spawn(entrypoint, .{ &sync, func, &ret, args }); + _ = self.num_tasks.fetchAdd(1, .monotonic); + defer _ = self.num_tasks.fetchSub(1, .release); + try self.pool.spawn(entrypoint, .{ self, &completed, func, &ret, args }); var wait_err: aio.WaitEventSource.Error = error.Success; - while (!sync.completed) { + while (!completed) { if (try io.privateComplete(.{ - aio.WaitEventSource{ .source = sync.source, .link = .soft, .out_error = &wait_err }, + aio.WaitEventSource{ .source = self.source, .link = .soft, .out_error = &wait_err }, }, .io_waiting_thread) > 0) { // it's possible to end up here if aio implementation ran out of resources // in case of io_uring the application managed to fill up the submission queue // normally this should not happen, but as to not crash the program do a blocking wait - sync.source.wait(); + self.source.wait(); } } return ret;