diff --git a/src/aio/common/posix.zig b/src/aio/common/posix.zig index d95c5ad..51f9253 100644 --- a/src/aio/common/posix.zig +++ b/src/aio/common/posix.zig @@ -25,33 +25,37 @@ pub const EventSource = struct { } pub inline fn notify(self: *@This()) void { - if (comptime @hasDecl(std.posix.system, "eventfd")) { - _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; - } else if (comptime @hasDecl(std.posix.system, "kqueue")) { - _ = std.posix.kevent(self.fd, &.{.{ - .ident = @intCast(self.fd), - .filter = std.posix.system.EVFILT_USER, - .flags = std.posix.system.EV_ADD | std.posix.system.EV_ENABLE | std.posix.system.EV_ONESHOT, - .fflags = std.posix.system.NOTE_TRIGGER, - .data = 0, - .udata = 0, - }}, &.{}, null) catch unreachable; - } else { - unreachable; + while (true) { + if (comptime @hasDecl(std.posix.system, "eventfd")) { + _ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch continue; + } else if (comptime @hasDecl(std.posix.system, "kqueue")) { + _ = std.posix.kevent(self.fd, &.{.{ + .ident = @intCast(self.fd), + .filter = std.posix.system.EVFILT_USER, + .flags = std.posix.system.EV_ADD | std.posix.system.EV_ENABLE | std.posix.system.EV_ONESHOT, + .fflags = std.posix.system.NOTE_TRIGGER, + .data = 0, + .udata = 0, + }}, &.{}, null) catch continue; + } else { + unreachable; + } + break; } } pub inline fn wait(self: *@This()) void { - if (comptime @hasDecl(std.posix.system, "eventfd")) { - var v: u64 = undefined; - _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable; - } else if (comptime @hasDecl(std.posix.system, "kqueue")) { - var pfds: [1]std.posix.pollfd = .{.{ .fd = self.fd, .events = std.posix.POLL.IN, .revents = 0 }}; - _ = std.posix.poll(&pfds, -1) catch unreachable; - var ev: [1]std.posix.Kevent = undefined; - _ = std.posix.kevent(self.fd, &.{}, &ev, null) catch unreachable; - } else { - unreachable; + while (true) { + if (comptime @hasDecl(std.posix.system, "eventfd")) { + var v: u64 = undefined; + _ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch continue; + } else if (comptime @hasDecl(std.posix.system, "kqueue")) { + var ev: [1]std.posix.Kevent = undefined; + _ = std.posix.kevent(self.fd, &.{}, &ev, null) catch continue; + } else { + unreachable; + } + break; } } }; diff --git a/src/coro.zig b/src/coro.zig index 71b24c0..41ced8b 100644 --- a/src/coro.zig +++ b/src/coro.zig @@ -380,14 +380,7 @@ pub const ThreadPool = struct { source.notify(); } - const Error = error{ - OutOfMemory, - SystemResources, - ProcessQuotaExceeded, - SystemQuotaExceeded, - Unexpected, - SomeOperationFailed, - } || aio.Error || aio.WaitEventSource.Error; + const Error = error{SomeOperationFailed} || aio.Error || aio.EventSource.Error || aio.WaitEventSource.Error; fn ReturnType(comptime Func: type) type { const base = @typeInfo(Func).Fn.return_type.?;