diff --git a/src/aio/Fallback.zig b/src/aio/Fallback.zig index e0d1261..ba59d5c 100644 --- a/src/aio/Fallback.zig +++ b/src/aio/Fallback.zig @@ -1,53 +1,44 @@ +const builtin = @import("builtin"); const std = @import("std"); +const posix = @import("common/posix.zig"); const aio = @import("../aio.zig"); const Operation = @import("ops.zig").Operation; const Pool = @import("common/types.zig").Pool; const FixedArrayList = @import("common/types.zig").FixedArrayList; -const posix = @import("common/posix.zig"); +const DoubleBufferedFixedArrayList = @import("common/types.zig").DoubleBufferedFixedArrayList; // This mess of a code just shows how much io_uring was needed -fn debug(comptime fmt: []const u8, args: anytype) void { - if (@import("builtin").is_test) { - std.debug.print("fallback: " ++ fmt ++ "\n", args); - } else { - if (comptime !aio.options.debug) return; - const log = std.log.scoped(.fallback); - log.debug(fmt, args); - } -} - pub const EventSource = posix.EventSource; const Result = struct { failure: Operation.Error, id: u16 }; -source: EventSource, -tpool: *std.Thread.Pool, ops: Pool(Operation.Union, u16), -next: []u16, -readiness: []posix.Readiness, -link_lock: std.DynamicBitSetUnmanaged, -started: std.DynamicBitSetUnmanaged, -pending: std.DynamicBitSetUnmanaged, -pfd: FixedArrayList(posix.pollfd, u32), prev_id: ?u16 = null, // for linking operations -finished: FixedArrayList(Result, u16), -finished_mutex: std.Thread.Mutex = .{}, -// copied on completition -finished_copy: FixedArrayList(Result, u16), +next: []u16, // linked operation, points to self if none +readiness: []posix.Readiness, // readiness fd that gets polled before we perform the operation +link_lock: std.DynamicBitSetUnmanaged, // operation is waiting for linked operation to finish first +pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll) +started: std.DynamicBitSetUnmanaged, // operation has been queued, it's being performed if pending is false +pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup +tpool: *std.Thread.Pool, // thread pool for performing operations, not all operations will be performed here +source: EventSource, // when threads finish, they signal it using this event source +finished: DoubleBufferedFixedArrayList(Result, u16), // operations that are finished, double buffered to be thread safe pub fn isSupported(_: []const type) bool { return true; // very optimistic :D } +fn minThreads() u32 { + // might need this on BSD too. + // it's not a great solution, but this at least lets apps that poll /dev/tty + // work with one dedicated thread given for it. + // unfortunately pselect/select which will work on /dev/tty are just too annoying + // to try and kludge into this backend. + return if (builtin.target.isDarwin()) 2 else 1; +} + pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { - var source = try EventSource.init(); - errdefer source.deinit(); - var tpool = try allocator.create(std.Thread.Pool); - tpool.init(.{ .allocator = allocator, .n_jobs = aio.options.num_threads }) catch |err| return switch (err) { - error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources, - else => |e| e, - }; var ops = try Pool(Operation.Union, u16).init(allocator, n); errdefer ops.deinit(allocator); const next = try allocator.alloc(u16, n); @@ -56,28 +47,35 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { errdefer allocator.free(readiness); var link_lock = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); errdefer link_lock.deinit(allocator); - var started = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); - errdefer started.deinit(allocator); var pending = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); errdefer pending.deinit(allocator); + var started = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n); + errdefer started.deinit(allocator); var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1); errdefer pfd.deinit(allocator); - var finished = try FixedArrayList(Result, u16).init(allocator, n); + var tpool = try allocator.create(std.Thread.Pool); + errdefer allocator.destroy(tpool); + const thread_count: u32 = aio.options.num_threads orelse @intCast(@max(minThreads(), std.Thread.getCpuCount() catch 1)); + tpool.init(.{ .allocator = allocator, .n_jobs = thread_count }) catch |err| return switch (err) { + error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources, + else => |e| e, + }; + errdefer tpool.deinit(); + var source = try EventSource.init(); + errdefer source.deinit(); + var finished = try DoubleBufferedFixedArrayList(Result, u16).init(allocator, n); errdefer finished.deinit(allocator); - var finished_copy = try FixedArrayList(Result, u16).init(allocator, n); - errdefer finished_copy.deinit(allocator); return .{ - .source = source, - .tpool = tpool, .ops = ops, .next = next, .readiness = readiness, .link_lock = link_lock, - .started = started, .pending = pending, + .started = started, .pfd = pfd, + .tpool = tpool, + .source = source, .finished = finished, - .finished_copy = finished_copy, }; } @@ -88,14 +86,13 @@ pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { while (iter.next()) |e| uopUnwrapCall(e.v, posix.closeReadiness, .{self.readiness[e.k]}); self.ops.deinit(allocator); allocator.free(self.next); + allocator.free(self.readiness); self.link_lock.deinit(allocator); - self.started.deinit(allocator); self.pending.deinit(allocator); - allocator.free(self.readiness); + self.started.deinit(allocator); self.pfd.deinit(allocator); - self.finished.deinit(allocator); - self.finished_copy.deinit(allocator); self.source.deinit(); + self.finished.deinit(allocator); self.* = undefined; } @@ -144,7 +141,7 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic. } else { var ids: std.BoundedArray(u16, len) = .{}; errdefer for (ids.constSlice()) |id| self.removeOp(id); - inline for (&work.ops) |*op| ids.append(try self.queueOperation(op)) catch unreachable; + inline for (&work.ops) |*op| ids.append(try self.queueOperation(op)) catch return error.SubmissionQueueFull; if (cb) |f| for (ids.constSlice()) |id| f(self.ops.nodes[id].used, @enumFromInt(id)); } } @@ -207,15 +204,13 @@ pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 { } fn finish(self: *@This(), id: u16, failure: Operation.Error) void { - defer self.source.notify(); - self.finished_mutex.lock(); - defer self.finished_mutex.unlock(); debug("finish: {} {}", .{ id, failure }); - for (self.finished.items[0..self.finished.len]) |*i| if (i.id == id) { - i.* = .{ .id = id, .failure = failure }; - return; - }; + // for (self.finished.items[0..self.finished.len]) |*i| if (i.id == id) { + // i.* = .{ .id = id, .failure = failure }; + // return; + // }; self.finished.add(.{ .id = id, .failure = failure }) catch unreachable; + self.source.notify(); } fn cancel(self: *@This(), id: u16) enum { in_progress, not_found, ok } { @@ -346,16 +341,9 @@ fn completition(op: anytype, self: *@This(), res: Result) void { } fn handleFinished(self: *@This(), cb: ?aio.Dynamic.CompletionCallback) aio.CompletionResult { - { - self.finished_mutex.lock(); - defer self.finished_mutex.unlock(); - @memcpy(self.finished_copy.items[0..self.finished.items.len], self.finished.items[0..self.finished.items.len]); - self.finished_copy.len = self.finished.len; - self.finished.reset(); - } - + const finished = self.finished.swap(); var num_errors: u16 = 0; - for (self.finished_copy.items[0..self.finished_copy.len]) |res| { + for (finished) |res| { if (res.failure != error.Success) { debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used), res.failure }); } else { @@ -374,8 +362,7 @@ fn handleFinished(self: *@This(), cb: ?aio.Dynamic.CompletionCallback) aio.Compl self.removeOp(res.id); if (cb) |f| f(uop, @enumFromInt(res.id), res.failure != error.Success); } - - return .{ .num_completed = self.finished_copy.len, .num_errors = num_errors }; + return .{ .num_completed = @intCast(finished.len), .num_errors = num_errors }; } fn uopUnwrapCall(uop: *Operation.Union, comptime func: anytype, args: anytype) @typeInfo(@TypeOf(func)).Fn.return_type.? { @@ -384,3 +371,13 @@ fn uopUnwrapCall(uop: *Operation.Union, comptime func: anytype, args: anytype) @ } unreachable; } + +fn debug(comptime fmt: []const u8, args: anytype) void { + if (@import("builtin").is_test) { + std.debug.print("fallback: " ++ fmt ++ "\n", args); + } else { + if (comptime !aio.options.debug) return; + const log = std.log.scoped(.fallback); + log.debug(fmt, args); + } +} diff --git a/src/aio/common/posix.zig b/src/aio/common/posix.zig index 3ba583e..149bcd2 100644 --- a/src/aio/common/posix.zig +++ b/src/aio/common/posix.zig @@ -4,7 +4,7 @@ const Operation = @import("../ops.zig").Operation; const windows = @import("windows.zig"); pub const RENAME_NOREPLACE = 1 << 0; -pub const PIDFD_NONBLOCK = @as(usize, 1 << @bitOffsetOf(std.posix.O, "NONBLOCK")); +pub const O_NONBLOCK = @as(usize, 1 << @bitOffsetOf(std.posix.O, "NONBLOCK")); const EventFd = struct { fd: std.posix.fd_t, @@ -275,8 +275,18 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness { return switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { .nop => .{}, .fsync => .{}, - .write => .{ .fd = op.file.handle, .mode = .out }, - .read => .{ .fd = op.file.handle, .mode = .in }, + .write => blk: { + if (builtin.target.isDarwin() and std.posix.isatty(op.file.handle)) { + return .{}; // nice :D will block one thread + } + break :blk .{ .fd = op.file.handle, .mode = .out }; + }, + .read => blk: { + if (builtin.target.isDarwin() and std.posix.isatty(op.file.handle)) { + return .{}; // nice :D will block one thread + } + break :blk .{ .fd = op.file.handle, .mode = .in }; + }, .accept, .recv => .{ .fd = op.socket, .mode = .in }, .socket, .connect => .{}, .send => .{ .fd = op.socket, .mode = .out }, @@ -301,7 +311,7 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness { if (builtin.target.os.tag == .windows) { @panic("fixme"); } else if (comptime @hasDecl(std.posix.system, "pidfd_open")) { - const res = std.posix.system.pidfd_open(op.child, PIDFD_NONBLOCK); + const res = std.posix.system.pidfd_open(op.child, O_NONBLOCK); const e = std.posix.errno(res); if (e != .SUCCESS) return switch (e) { .INVAL, .SRCH => unreachable, diff --git a/src/aio/common/types.zig b/src/aio/common/types.zig index 97d5a41..588924c 100644 --- a/src/aio/common/types.zig +++ b/src/aio/common/types.zig @@ -5,9 +5,7 @@ pub fn FixedArrayList(T: type, SZ: type) type { items: []T, len: SZ = 0, - pub const Error = error{ - OutOfMemory, - }; + pub const Error = error{OutOfMemory}; pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { return .{ .items = try allocator.alloc(T, n) }; @@ -30,6 +28,50 @@ pub fn FixedArrayList(T: type, SZ: type) type { }; } +pub fn DoubleBufferedFixedArrayList(T: type, SZ: type) type { + return struct { + mutex: std.Thread.Mutex = .{}, + safe: FixedArrayList(T, SZ), + copy: []T align(std.atomic.cache_line), + + pub const Error = error{OutOfMemory}; + + pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { + var safe = try FixedArrayList(T, SZ).init(allocator, n); + errdefer safe.deinit(allocator); + const copy = try allocator.alloc(T, n); + errdefer allocator.free(copy); + return .{ .safe = safe, .copy = copy }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.safe.deinit(allocator); + allocator.free(self.copy); + self.* = undefined; + } + + pub fn add(self: *@This(), item: T) Error!void { + self.mutex.lock(); + defer self.mutex.unlock(); + try self.safe.add(item); + } + + pub fn reset(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.safe.reset(); + } + + pub fn swap(self: *@This()) []const T { + self.mutex.lock(); + defer self.mutex.unlock(); + defer self.safe.reset(); + @memcpy(self.copy[0..self.safe.len], self.safe.items[0..self.safe.len]); + return self.copy[0..self.safe.len]; + } + }; +} + pub fn Pool(T: type, SZ: type) type { return struct { pub const Node = union(enum) { free: ?SZ, used: T }; @@ -38,9 +80,7 @@ pub fn Pool(T: type, SZ: type) type { num_free: SZ = 0, num_used: SZ = 0, - pub const Error = error{ - OutOfMemory, - }; + pub const Error = error{OutOfMemory}; pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { return .{ .nodes = try allocator.alloc(Node, n) };