diff --git a/src/aio/IoUring.zig b/src/aio/IoUring.zig index b7a312e..ac59b10 100644 --- a/src/aio/IoUring.zig +++ b/src/aio/IoUring.zig @@ -106,26 +106,15 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic. self.pending += len; } -pub const NOP = std.math.maxInt(u64); - /// TODO: give options perhaps? More customization? pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynamic.CompletionCallback) aio.Error!aio.CompletionResult { if (self.pending == 0) return .{}; - if (mode == .nonblocking) { - var last = &self.io.sq.sqes[self.io.sq.sqe_tail & self.io.sq.mask]; - _ = try self.io.nop(NOP); - // it is error to have these on the last operation - last.flags &= ~@as(u8, std.os.linux.IOSQE_IO_LINK | std.os.linux.IOSQE_IO_HARDLINK); - self.pending += 1; - } - _ = try uring_submit(&self.io); var result: aio.CompletionResult = .{}; - const n = try uring_copy_cqes(&self.io, self.cqes, 1); + const n = try uring_copy_cqes(&self.io, self.cqes, if (mode == .nonblocking) 0 else 1, false); for (self.cqes[0..n]) |*cqe| { - if (cqe.user_data == NOP) continue; const uop = self.ops.get(@intCast(cqe.user_data)).*; var failed: bool = false; switch (uop) { @@ -139,7 +128,7 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynam } self.pending -= n; - result.num_completed = n - @intFromBool(mode == .nonblocking); + result.num_completed = n; return result; } @@ -152,7 +141,7 @@ pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 { var num_errors: u16 = 0; var cqes: [len]std.os.linux.io_uring_cqe = undefined; while (num > 0) { - const n = try uring_copy_cqes(&io, &cqes, num); + const n = try uring_copy_cqes(&io, &cqes, num, false); for (cqes[0..n]) |*cqe| { inline for (&work.ops, 0..) |*op, idx| if (idx == cqe.user_data) { uring_handle_completion(op, cqe) catch { @@ -320,9 +309,39 @@ inline fn uring_submit(io: *std.os.linux.IoUring) aio.Error!u16 { } } -inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, len: u16) aio.Error!u16 { +inline fn copy_cqes_ready(uring: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe) u32 { + const ready = uring.cq_ready(); + const count = @min(cqes.len, ready); + const head = uring.cq.head.* & uring.cq.mask; + + // before wrapping + const n = @min(uring.cq.cqes.len - head, count); + @memcpy(cqes[0..n], uring.cq.cqes[head..][0..n]); + + if (count > n) { + // wrap uring.cq.cqes + const w = count - n; + @memcpy(cqes[n..][0..w], uring.cq.cqes[0..w]); + } + + uring.cq_advance(count); + return count; +} + +// The std one assumes wait_nr == 0 means IORING_SETUP_SQPOLL, this is not true for us +inline fn copy_cqes(uring: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, wait_nr: u32, sqpoll: bool) !u32 { + const count = copy_cqes_ready(uring, cqes); + if (count > 0) return count; + if (!sqpoll or uring.cq_ring_needs_flush()) { + _ = try uring.enter(0, wait_nr, std.os.linux.IORING_ENTER_GETEVENTS); + return copy_cqes_ready(uring, cqes); + } + return 0; +} + +inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, len: u16, sqpoll: bool) aio.Error!u16 { while (true) { - const n = io.copy_cqes(cqes, len) catch |err| switch (err) { + const n = copy_cqes(io, cqes, len, sqpoll) catch |err| switch (err) { error.FileDescriptorInvalid => unreachable, error.FileDescriptorInBadState => unreachable, error.BufferInvalid => unreachable,