Skip to content

Commit

Permalink
io_uring: remove NOP queue on nonblocking mode
Browse files Browse the repository at this point in the history
We don't actually have to do this if we don't use the std api.
  • Loading branch information
Cloudef committed Jun 28, 2024
1 parent 81dd499 commit fdede2b
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions src/aio/IoUring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,15 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic.
self.pending += len;
}

pub const NOP = std.math.maxInt(u64);

/// TODO: give options perhaps? More customization?
pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynamic.CompletionCallback) aio.Error!aio.CompletionResult {
if (self.pending == 0) return .{};

if (mode == .nonblocking) {
var last = &self.io.sq.sqes[self.io.sq.sqe_tail & self.io.sq.mask];
_ = try self.io.nop(NOP);
// it is error to have these on the last operation
last.flags &= ~@as(u8, std.os.linux.IOSQE_IO_LINK | std.os.linux.IOSQE_IO_HARDLINK);
self.pending += 1;
}

_ = try uring_submit(&self.io);

var result: aio.CompletionResult = .{};
const n = try uring_copy_cqes(&self.io, self.cqes, 1);
const n = try uring_copy_cqes(&self.io, self.cqes, if (mode == .nonblocking) 0 else 1, false);
for (self.cqes[0..n]) |*cqe| {
if (cqe.user_data == NOP) continue;
const uop = self.ops.get(@intCast(cqe.user_data)).*;
var failed: bool = false;
switch (uop) {
Expand All @@ -139,7 +128,7 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynam
}

self.pending -= n;
result.num_completed = n - @intFromBool(mode == .nonblocking);
result.num_completed = n;
return result;
}

Expand All @@ -152,7 +141,7 @@ pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 {
var num_errors: u16 = 0;
var cqes: [len]std.os.linux.io_uring_cqe = undefined;
while (num > 0) {
const n = try uring_copy_cqes(&io, &cqes, num);
const n = try uring_copy_cqes(&io, &cqes, num, false);
for (cqes[0..n]) |*cqe| {
inline for (&work.ops, 0..) |*op, idx| if (idx == cqe.user_data) {
uring_handle_completion(op, cqe) catch {
Expand Down Expand Up @@ -320,9 +309,39 @@ inline fn uring_submit(io: *std.os.linux.IoUring) aio.Error!u16 {
}
}

inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, len: u16) aio.Error!u16 {
inline fn copy_cqes_ready(uring: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe) u32 {
const ready = uring.cq_ready();
const count = @min(cqes.len, ready);
const head = uring.cq.head.* & uring.cq.mask;

// before wrapping
const n = @min(uring.cq.cqes.len - head, count);
@memcpy(cqes[0..n], uring.cq.cqes[head..][0..n]);

if (count > n) {
// wrap uring.cq.cqes
const w = count - n;
@memcpy(cqes[n..][0..w], uring.cq.cqes[0..w]);
}

uring.cq_advance(count);
return count;
}

// The std one assumes wait_nr == 0 means IORING_SETUP_SQPOLL, this is not true for us
inline fn copy_cqes(uring: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, wait_nr: u32, sqpoll: bool) !u32 {
const count = copy_cqes_ready(uring, cqes);
if (count > 0) return count;
if (!sqpoll or uring.cq_ring_needs_flush()) {
_ = try uring.enter(0, wait_nr, std.os.linux.IORING_ENTER_GETEVENTS);
return copy_cqes_ready(uring, cqes);
}
return 0;
}

inline fn uring_copy_cqes(io: *std.os.linux.IoUring, cqes: []std.os.linux.io_uring_cqe, len: u16, sqpoll: bool) aio.Error!u16 {
while (true) {
const n = io.copy_cqes(cqes, len) catch |err| switch (err) {
const n = copy_cqes(io, cqes, len, sqpoll) catch |err| switch (err) {
error.FileDescriptorInvalid => unreachable,
error.FileDescriptorInBadState => unreachable,
error.BufferInvalid => unreachable,
Expand Down

0 comments on commit fdede2b

Please sign in to comment.