From 3c66f5a5a1f6dff532e22cb666001e4028acc7ce Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Thu, 27 Jun 2024 13:35:02 +0900 Subject: [PATCH] aio/coro: move common utilities to minilib module --- build.zig | 11 +- examples/coro_wttr.zig | 2 +- src/aio/Fallback.zig | 28 ++- src/aio/IoUring.zig | 8 +- src/coro/Frame.zig | 6 +- src/coro/Scheduler.zig | 9 +- src/coro/Task.zig | 3 +- src/coro/ThreadPool.zig | 30 ++-- src/coro/common.zig | 47 ----- src/coro/io.zig | 17 +- src/minilib.zig | 45 +++++ src/minilib/DynamicThreadPool.zig | 160 ++++++++++++++++++ src/minilib/fixed_array_list.zig | 77 +++++++++ src/{aio/common.zig => minilib/item_pool.zig} | 78 +-------- 14 files changed, 363 insertions(+), 158 deletions(-) delete mode 100644 src/coro/common.zig create mode 100644 src/minilib.zig create mode 100644 src/minilib/DynamicThreadPool.zig create mode 100644 src/minilib/fixed_array_list.zig rename src/{aio/common.zig => minilib/item_pool.zig} (54%) diff --git a/build.zig b/build.zig index 8fa7e42..953e177 100644 --- a/build.zig +++ b/build.zig @@ -8,12 +8,19 @@ pub fn build(b: *std.Build) void { const fallback = b.option(bool, "fallback", "use fallback event loop") orelse false; opts.addOption(bool, "fallback", fallback); + const minilib = b.addModule("minilib", .{ + .root_source_file = b.path("src/minilib.zig"), + .target = target, + .optimize = optimize, + }); + const aio = b.addModule("aio", .{ .root_source_file = b.path("src/aio.zig"), .target = target, .optimize = optimize, .link_libc = target.query.os_tag == .windows, }); + aio.addImport("minilib", minilib); aio.addImport("build_options", opts.createModule()); const coro = b.addModule("coro", .{ @@ -21,6 +28,7 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + coro.addImport("minilib", minilib); coro.addImport("aio", aio); const run_all = b.step("run", "Run all examples"); @@ -48,7 +56,7 @@ pub fn build(b: *std.Build) void { const test_filter = b.option([]const u8, "test-filter", "Skip tests that do not match any filter") orelse ""; const test_step = b.step("test", "Run unit tests"); - inline for (.{ .aio, .coro }) |mod| { + inline for (.{ .minilib, .aio, .coro }) |mod| { const tst = b.addTest(.{ .root_source_file = b.path("src/" ++ @tagName(mod) ++ ".zig"), .target = target, @@ -56,6 +64,7 @@ pub fn build(b: *std.Build) void { .filters = &.{test_filter}, .link_libc = target.query.os_tag == .windows, }); + if (mod != .minilib) tst.root_module.addImport("minilib", minilib); if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule()); if (mod == .coro) tst.root_module.addImport("aio", aio); const run = b.addRunArtifact(tst); diff --git a/examples/coro_wttr.zig b/examples/coro_wttr.zig index 67350a9..133cc9d 100644 --- a/examples/coro_wttr.zig +++ b/examples/coro_wttr.zig @@ -70,7 +70,7 @@ pub fn main() !void { const ltask = try scheduler.spawn(loader, .{ &completed, &max }, .{}); var tpool: coro.ThreadPool = .{}; - try tpool.start(gpa.allocator(), 1); + try tpool.start(gpa.allocator(), .{}); defer tpool.deinit(); var tasks = std.ArrayList(coro.Task.Generic(anyerror![]const u8)).init(allocator); diff --git a/src/aio/Fallback.zig b/src/aio/Fallback.zig index 011f1a2..9b6b308 100644 --- a/src/aio/Fallback.zig +++ b/src/aio/Fallback.zig @@ -3,9 +3,10 @@ const std = @import("std"); const aio = @import("../aio.zig"); const posix = @import("posix.zig"); const Operation = @import("ops.zig").Operation; -const Pool = @import("common.zig").Pool; -const FixedArrayList = @import("common.zig").FixedArrayList; -const DoubleBufferedFixedArrayList = @import("common.zig").DoubleBufferedFixedArrayList; +const ItemPool = @import("minilib").ItemPool; +const FixedArrayList = @import("minilib").FixedArrayList; +const DoubleBufferedFixedArrayList = @import("minilib").DoubleBufferedFixedArrayList; +const DynamicThreadPool = @import("minilib").DynamicThreadPool; // This tries to emulate io_uring functionality. // If something does not match how it works on io_uring on linux, it should be change to match. @@ -14,11 +15,20 @@ const DoubleBufferedFixedArrayList = @import("common.zig").DoubleBufferedFixedAr // However it might be still more pleasant experience than (e)poll/kqueueing away as the behaviour should be // more or less consistent. +comptime { + if (builtin.single_threaded) { + @compileError( + \\Fallback backend requires building with threads as otherwise it may block the whole program. + \\To only target linux and io_uring, set `aio_options.fallback = .disable` in your root .zig file. + ); + } +} + pub const EventSource = posix.EventSource; const Result = struct { failure: Operation.Error, id: u16 }; -ops: Pool(Operation.Union, u16), +ops: ItemPool(Operation.Union, u16), prev_id: ?u16 = null, // for linking operations next: []u16, // linked operation, points to self if none readiness: []posix.Readiness, // readiness fd that gets polled before we perform the operation @@ -26,7 +36,7 @@ link_lock: std.DynamicBitSetUnmanaged, // operation is waiting for linked operat pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll) started: std.DynamicBitSetUnmanaged, // operation has been queued, it's being performed if pending is false pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup -tpool: *std.Thread.Pool, // thread pool for performing operations, not all operations will be performed here +tpool: *DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here source: EventSource, // when threads finish, they signal it using this event source finished: DoubleBufferedFixedArrayList(Result, u16), // operations that are finished, double buffered to be thread safe @@ -53,7 +63,7 @@ fn minThreads() u32 { } pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { - var ops = try Pool(Operation.Union, u16).init(allocator, n); + var ops = try ItemPool(Operation.Union, u16).init(allocator, n); errdefer ops.deinit(allocator); const next = try allocator.alloc(u16, n); errdefer allocator.free(next); @@ -67,11 +77,11 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { errdefer started.deinit(allocator); var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1); errdefer pfd.deinit(allocator); - var tpool = try allocator.create(std.Thread.Pool); + var tpool = try allocator.create(DynamicThreadPool); errdefer allocator.destroy(tpool); const thread_count: u32 = aio.options.num_threads orelse @intCast(@max(minThreads(), std.Thread.getCpuCount() catch 1)); - tpool.init(.{ .allocator = allocator, .n_jobs = thread_count }) catch |err| return switch (err) { - error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources, + tpool.init(allocator, .{ .num_threads = thread_count }) catch |err| return switch (err) { + error.TimerUnsupported => error.SystemOutdated, else => |e| e, }; errdefer tpool.deinit(); diff --git a/src/aio/IoUring.zig b/src/aio/IoUring.zig index 4176cbf..556a883 100644 --- a/src/aio/IoUring.zig +++ b/src/aio/IoUring.zig @@ -1,7 +1,7 @@ const std = @import("std"); const aio = @import("../aio.zig"); const Operation = @import("ops.zig").Operation; -const Pool = @import("common.zig").Pool; +const ItemPool = @import("minilib").ItemPool; const posix = @import("posix.zig"); const linux = @import("posix/linux.zig"); const log = std.log.scoped(.io_uring); @@ -37,10 +37,10 @@ const Supported = struct { // Could also make io_uring based event source with `IORING_OP_MSG_RING` // However I've read some claims that passing messages to other ring has more // latency than actually using eventfd. Eventfd is simple and reliable. -pub const EventSource = posix.EventSource; +pub const EventSource = linux.EventSource; io: std.os.linux.IoUring, -ops: Pool(Operation.Union, u16), +ops: ItemPool(Operation.Union, u16), pub inline fn isSupported(op_types: []const type) bool { var ops: [op_types.len]std.os.linux.IORING_OP = undefined; @@ -78,7 +78,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { const n2 = std.math.ceilPowerOfTwo(u16, n) catch unreachable; var io = try uring_init(n2); errdefer io.deinit(); - const ops = try Pool(Operation.Union, u16).init(allocator, n2); + const ops = try ItemPool(Operation.Union, u16).init(allocator, n2); errdefer ops.deinit(allocator); return .{ .io = io, .ops = ops }; } diff --git a/src/coro/Frame.zig b/src/coro/Frame.zig index 5afde19..f1e75c7 100644 --- a/src/coro/Frame.zig +++ b/src/coro/Frame.zig @@ -2,10 +2,10 @@ const std = @import("std"); const aio = @import("aio"); const Fiber = @import("zefi.zig"); const Scheduler = @import("Scheduler.zig"); -const common = @import("common.zig"); +const Link = @import("minilib").Link; const log = std.log.scoped(.coro); -pub const List = std.DoublyLinkedList(common.Link(@This(), "link", .double)); +pub const List = std.DoublyLinkedList(Link(@This(), "link", .double)); pub const stack_alignment = Fiber.stack_alignment; pub const Stack = Fiber.Stack; @@ -23,7 +23,7 @@ pub const Status = enum(u8) { } }; -pub const WaitList = std.SinglyLinkedList(common.Link(@This(), "wait_link", .single)); +pub const WaitList = std.SinglyLinkedList(Link(@This(), "wait_link", .single)); fiber: *Fiber, stack: ?Fiber.Stack = null, diff --git a/src/coro/Scheduler.zig b/src/coro/Scheduler.zig index 0ce1bd3..94aa062 100644 --- a/src/coro/Scheduler.zig +++ b/src/coro/Scheduler.zig @@ -3,9 +3,8 @@ const aio = @import("aio"); const io = @import("io.zig"); const Frame = @import("Frame.zig"); const Task = @import("Task.zig"); -const common = @import("common.zig"); +const ReturnType = @import("minilib").ReturnType; const options = @import("../coro.zig").options; -const ReturnType = common.ReturnType; allocator: std.mem.Allocator, io: aio.Dynamic, @@ -100,20 +99,22 @@ pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void { } fn ioQueue(uop: aio.Dynamic.Uop, id: aio.Id) void { + const OperationContext = @import("io.zig").OperationContext; switch (uop) { inline else => |*op| { std.debug.assert(op.userdata != 0); - var ctx: *common.OperationContext = @ptrFromInt(op.userdata); + var ctx: *OperationContext = @ptrFromInt(op.userdata); ctx.id = id; }, } } fn ioCompletition(uop: aio.Dynamic.Uop, _: aio.Id, failed: bool) void { + const OperationContext = @import("io.zig").OperationContext; switch (uop) { inline else => |*op| { std.debug.assert(op.userdata != 0); - var ctx: *common.OperationContext = @ptrFromInt(op.userdata); + var ctx: *OperationContext = @ptrFromInt(op.userdata); var frame: *Frame = ctx.whole.frame; std.debug.assert(ctx.whole.num_operations > 0); ctx.completed = true; diff --git a/src/coro/Task.zig b/src/coro/Task.zig index 4010691..7232f84 100644 --- a/src/coro/Task.zig +++ b/src/coro/Task.zig @@ -1,7 +1,6 @@ const std = @import("std"); const aio = @import("aio"); -const ReturnType = @import("common.zig").ReturnType; -const ReturnTypeWithError = @import("common.zig").ReturnTypeWithError; +const ReturnType = @import("minilib").ReturnType; const Frame = @import("Frame.zig"); const Task = @This(); diff --git a/src/coro/ThreadPool.zig b/src/coro/ThreadPool.zig index f418cfe..7eb096f 100644 --- a/src/coro/ThreadPool.zig +++ b/src/coro/ThreadPool.zig @@ -4,19 +4,27 @@ const io = @import("io.zig"); const Scheduler = @import("Scheduler.zig"); const Task = @import("Task.zig"); const Frame = @import("Frame.zig"); -const ReturnTypeWithError = @import("common.zig").ReturnTypeWithError; -const ReturnType = @import("common.zig").ReturnType; +const DynamicThreadPool = @import("minilib").DynamicThreadPool; +const ReturnType = @import("minilib").ReturnType; +const ReturnTypeMixedWithErrorSet = @import("minilib").ReturnTypeMixedWithErrorSet; -pool: std.Thread.Pool = undefined, +pool: DynamicThreadPool = undefined, source: aio.EventSource = undefined, num_tasks: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), /// Spin up the pool, `allocator` is used to allocate the tasks -/// If `num_jobs` is zero, the thread count for the current CPU is used -pub fn start(self: *@This(), allocator: std.mem.Allocator, num_jobs: u32) !void { - self.* = .{ .pool = .{ .allocator = undefined, .threads = undefined }, .source = try aio.EventSource.init() }; +/// If `num_threads` is null, the thread count for the current CPU is used +pub fn start(self: *@This(), allocator: std.mem.Allocator, options: DynamicThreadPool.Options) !void { + self.* = .{ + .pool = .{ + .allocator = undefined, + .threads = undefined, + .timeout = undefined, + }, + .source = try aio.EventSource.init(), + }; errdefer self.source.deinit(); - try self.pool.init(.{ .allocator = allocator, .n_jobs = if (num_jobs == 0) null else num_jobs }); + try self.pool.init(allocator, options); } pub fn deinit(self: *@This()) void { @@ -42,7 +50,7 @@ inline fn entrypoint(self: *@This(), completed: *bool, token: *CancellationToken } /// Yield until `func` finishes on another thread -pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) ReturnTypeWithError(func, std.Thread.SpawnError) { +pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) ReturnTypeMixedWithErrorSet(func, std.Thread.SpawnError) { var completed: bool = false; var res: ReturnType(func) = undefined; _ = self.num_tasks.fetchAdd(1, .monotonic); @@ -77,12 +85,12 @@ pub fn spawnAnyForCompletition(self: *@This(), scheduler: *Scheduler, Result: ty /// Helper for getting the Task.Generic when using spawnForCompletition tasks. pub fn Generic2(comptime func: anytype) type { - return Task.Generic(ReturnTypeWithError(func, std.Thread.SpawnError)); + return Task.Generic(ReturnTypeMixedWithErrorSet(func, std.Thread.SpawnError)); } /// Spawn a new coroutine which will immediately call `yieldForCompletition` for later collection of the result pub fn spawnForCompletition(self: *@This(), scheduler: *Scheduler, func: anytype, args: anytype, opts: Scheduler.SpawnOptions) Scheduler.SpawnError!Generic2(func) { - const Result = ReturnTypeWithError(func, std.Thread.SpawnError); + const Result = ReturnTypeMixedWithErrorSet(func, std.Thread.SpawnError); const task = try self.spawnAnyForCompletition(scheduler, Result, func, args, opts); return task.generic(Result); } @@ -117,7 +125,7 @@ test "ThreadPool" { defer scheduler.deinit(); var pool: ThreadPool = .{}; - try pool.start(std.testing.allocator, 0); + try pool.start(std.testing.allocator, .{}); defer pool.deinit(); for (0..10) |_| _ = try scheduler.spawn(Test.task, .{&pool}, .{}); diff --git a/src/coro/common.zig b/src/coro/common.zig deleted file mode 100644 index 6dc2801..0000000 --- a/src/coro/common.zig +++ /dev/null @@ -1,47 +0,0 @@ -const std = @import("std"); -const aio = @import("aio"); -const Frame = @import("Frame.zig"); - -pub const WholeContext = struct { - num_operations: u16, - num_errors: u16 = 0, - frame: *Frame, -}; - -pub const OperationContext = struct { - whole: *WholeContext, - id: ?aio.Id = null, - completed: bool = false, -}; - -pub fn Link(comptime T: type, comptime field: []const u8, comptime container: enum { single, double }) type { - return struct { - pub inline fn cast(self: *@This()) *T { - switch (container) { - .single => { - const node: *std.SinglyLinkedList(@This()).Node = @alignCast(@fieldParentPtr("data", self)); - return @fieldParentPtr(field, node); - }, - .double => { - const node: *std.DoublyLinkedList(@This()).Node = @alignCast(@fieldParentPtr("data", self)); - return @fieldParentPtr(field, node); - }, - } - } - }; -} - -pub fn ReturnType(comptime func: anytype) type { - return @typeInfo(@TypeOf(func)).Fn.return_type.?; -} - -pub fn ReturnTypeWithError(comptime func: anytype, comptime E: type) type { - return MixErrorUnion(ReturnType(func), E); -} - -pub fn MixErrorUnion(comptime T: type, comptime E: type) type { - return switch (@typeInfo(T)) { - .ErrorUnion => |eu| (E || eu.error_set)!eu.payload, - else => E!T, - }; -} diff --git a/src/coro/io.zig b/src/coro/io.zig index a65f578..1079fa6 100644 --- a/src/coro/io.zig +++ b/src/coro/io.zig @@ -2,7 +2,18 @@ const std = @import("std"); const aio = @import("aio"); const Scheduler = @import("Scheduler.zig"); const Frame = @import("Frame.zig"); -const common = @import("common.zig"); + +pub const WholeContext = struct { + num_operations: u16, + num_errors: u16 = 0, + frame: *Frame, +}; + +pub const OperationContext = struct { + whole: *WholeContext, + id: ?aio.Id = null, + completed: bool = false, +}; pub const Error = aio.Error || error{Canceled}; @@ -12,8 +23,8 @@ pub fn do(operations: anytype, status: Frame.Status) Error!u16 { if (frame.canceled) return error.Canceled; var work = struct { ops: @TypeOf(operations) }{ .ops = operations }; - var whole: common.WholeContext = .{ .num_operations = operations.len, .frame = frame }; - var ctx_list: [operations.len]common.OperationContext = undefined; + var whole: WholeContext = .{ .num_operations = operations.len, .frame = frame }; + var ctx_list: [operations.len]OperationContext = undefined; inline for (&work.ops, &ctx_list) |*op, *ctx| { ctx.* = .{ .whole = &whole }; diff --git a/src/minilib.zig b/src/minilib.zig new file mode 100644 index 0000000..7756cef --- /dev/null +++ b/src/minilib.zig @@ -0,0 +1,45 @@ +pub const DynamicThreadPool = @import("minilib/DynamicThreadPool.zig"); +pub const FixedArrayList = @import("minilib/fixed_array_list.zig").FixedArrayList; +pub const DoubleBufferedFixedArrayList = @import("minilib/fixed_array_list.zig").DoubleBufferedFixedArrayList; +pub const ItemPool = @import("minilib/item_pool.zig").ItemPool; + +const std = @import("std"); + +/// Lets say you have a linked list: +/// `const List = std.SinglyLinkedList(Link(SomeStruct, "member", .single));` +/// You can now store a field `member: List.Node` in `SomeStruct` and retieve a `*SomeStruct` by calling `cast()` on `List.Node`. +pub fn Link(comptime T: type, comptime field: []const u8, comptime container: enum { single, double }) type { + return struct { + pub inline fn cast(self: *@This()) *T { + switch (container) { + .single => { + const node: *std.SinglyLinkedList(@This()).Node = @alignCast(@fieldParentPtr("data", self)); + return @fieldParentPtr(field, node); + }, + .double => { + const node: *std.DoublyLinkedList(@This()).Node = @alignCast(@fieldParentPtr("data", self)); + return @fieldParentPtr(field, node); + }, + } + } + }; +} + +/// Returns the return type of a function +pub fn ReturnType(comptime func: anytype) type { + return @typeInfo(@TypeOf(func)).Fn.return_type orelse @compileError("Return type of a generic function could not be deduced"); +} + +/// Returns the return type of a function with error set mixed in it. +/// The return type will be converted into a error union if it wasn't already. +pub fn ReturnTypeMixedWithErrorSet(comptime func: anytype, comptime E: type) type { + return MixErrorUnionWithErrorSet(ReturnType(func), E); +} + +/// Mix error union with a error set +pub fn MixErrorUnionWithErrorSet(comptime T: type, comptime E: type) type { + return switch (@typeInfo(T)) { + .ErrorUnion => |eu| (E || eu.error_set)!eu.payload, + else => E!T, + }; +} diff --git a/src/minilib/DynamicThreadPool.zig b/src/minilib/DynamicThreadPool.zig new file mode 100644 index 0000000..7b005b2 --- /dev/null +++ b/src/minilib/DynamicThreadPool.zig @@ -0,0 +1,160 @@ +//! Basically `std.Thread.Pool` but supports timeout +//! That is, if threads have been inactive for specific timeout the pool will release the threads +//! The `num_threads` are also the maximum count of worker threads, but if there's not much activity +//! less threads are used. + +const builtin = @import("builtin"); +const std = @import("std"); + +const DynamicThread = struct { + active: bool = false, + thread: ?std.Thread = null, +}; + +allocator: std.mem.Allocator, +mutex: std.Thread.Mutex = .{}, +cond: std.Thread.Condition = .{}, +threads: []DynamicThread, +run_queue: RunQueue = .{}, +timeout: u64, + +const RunQueue = std.SinglyLinkedList(Runnable); +const Runnable = struct { runFn: RunProto }; +const RunProto = *const fn (*Runnable) void; + +pub const Options = struct { + // Use the cpu core count by default + num_threads: ?u32 = null, + // Inactivity timeout when the thread will be joined + timeout: u64 = 5 * std.time.ns_per_s, +}; + +pub const InitError = error{OutOfMemory} || std.time.Timer.Error; + +pub fn init(self: *@This(), allocator: std.mem.Allocator, options: Options) InitError!void { + self.* = .{ + .allocator = allocator, + .threads = &[_]DynamicThread{}, + .timeout = options.timeout, + }; + + if (builtin.single_threaded) { + return; + } + + _ = try std.time.Timer.start(); // check that we have a timer + const thread_count = options.num_threads orelse @max(1, std.Thread.getCpuCount() catch 1); + self.threads = try allocator.alloc(DynamicThread, thread_count); + for (self.threads) |*dthread| dthread.* = .{}; +} + +pub fn deinit(self: *@This()) void { + if (!builtin.single_threaded) { + { + self.mutex.lock(); + defer self.mutex.unlock(); + for (self.threads) |*dthread| dthread.active = false; + } + self.cond.broadcast(); + for (self.threads) |*dthread| if (dthread.thread) |thrd| thrd.join(); + self.allocator.free(self.threads); + } + self.* = undefined; +} + +pub const SpawnError = error{ + OutOfMemory, + SystemResources, + LockedMemoryLimitExceeded, + ThreadQuotaExceeded, + Unexpected, +}; + +pub fn spawn(self: *@This(), comptime func: anytype, args: anytype) SpawnError!void { + if (builtin.single_threaded) { + @call(.auto, func, args); + return; + } + + const Args = @TypeOf(args); + const Outer = @This(); + const Closure = struct { + arguments: Args, + pool: *Outer, + run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + + fn runFn(runnable: *Runnable) void { + const run_node: *RunQueue.Node = @fieldParentPtr("data", runnable); + const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node)); + @call(.auto, func, closure.arguments); + // The thread pool's allocator is protected by the mutex. + const mutex = &closure.pool.mutex; + mutex.lock(); + defer mutex.unlock(); + closure.pool.allocator.destroy(closure); + } + }; + + { + self.mutex.lock(); + defer self.mutex.unlock(); + + // Activate a new thread if the run queue is running hot + if (self.run_queue.first != null) { + for (self.threads) |*dthread| { + if (!dthread.active) { + dthread.active = true; + std.debug.assert(dthread.thread == null); + dthread.thread = try std.Thread.spawn(.{}, worker, .{ self, dthread }); + break; + } + } + } + + const closure = try self.allocator.create(Closure); + closure.* = .{ .arguments = args, .pool = self }; + self.run_queue.prepend(&closure.run_node); + } + + // Notify waiting threads outside the lock to try and keep the critical section small. + self.cond.signal(); +} + +fn worker(self: *@This(), thread: *DynamicThread) void { + var timer = std.time.Timer.start() catch unreachable; + main: while (thread.active) { + while (thread.active) { + // TODO: should serialize the acqusation order here so that + // threads will always pop the run queue in order + // this would make the busy threads always be at the beginning + // of the array, while less busy or dead threads are at the end + const node = blk: { + self.mutex.lock(); + defer self.mutex.unlock(); + break :blk self.run_queue.popFirst(); + }; + if (node) |run_node| { + const runFn = run_node.data.runFn; + runFn(&run_node.data); + timer.reset(); + } else break; + } + if (thread.active) { + self.mutex.lock(); + defer self.mutex.unlock(); + const now = timer.read(); + if (now >= self.timeout) break :main; + self.cond.timedWait(&self.mutex, self.timeout - now) catch break :main; + } + } + + self.mutex.lock(); + defer self.mutex.unlock(); + + if (thread.active) { + thread.active = false; + // the thread cleans up itself from here on + thread.thread.?.detach(); + thread.thread = null; + } +} diff --git a/src/minilib/fixed_array_list.zig b/src/minilib/fixed_array_list.zig new file mode 100644 index 0000000..218cc4f --- /dev/null +++ b/src/minilib/fixed_array_list.zig @@ -0,0 +1,77 @@ +//! Like bounded array but the array is allocated thus bound is runtime known +//! DoubleBufferedFixedArray has thread safe insertion and removal +//! swap() lets you thread safely retieve copy of current state of the array + +const std = @import("std"); + +pub fn FixedArrayList(T: type, SZ: type) type { + return struct { + items: []T, + len: SZ = 0, + + pub const Error = error{OutOfMemory}; + + pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { + return .{ .items = try allocator.alloc(T, n) }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + allocator.free(self.items); + self.* = undefined; + } + + pub fn add(self: *@This(), item: T) Error!void { + if (self.len >= self.items.len) return error.OutOfMemory; + self.items[self.len] = item; + self.len += 1; + } + + pub fn reset(self: *@This()) void { + self.len = 0; + } + }; +} + +pub fn DoubleBufferedFixedArrayList(T: type, SZ: type) type { + return struct { + mutex: std.Thread.Mutex = .{}, + safe: FixedArrayList(T, SZ), + copy: []T align(std.atomic.cache_line), + + pub const Error = error{OutOfMemory}; + + pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { + var safe = try FixedArrayList(T, SZ).init(allocator, n); + errdefer safe.deinit(allocator); + const copy = try allocator.alloc(T, n); + errdefer allocator.free(copy); + return .{ .safe = safe, .copy = copy }; + } + + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.safe.deinit(allocator); + allocator.free(self.copy); + self.* = undefined; + } + + pub fn add(self: *@This(), item: T) Error!void { + self.mutex.lock(); + defer self.mutex.unlock(); + try self.safe.add(item); + } + + pub fn reset(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.safe.reset(); + } + + pub fn swap(self: *@This()) []const T { + self.mutex.lock(); + defer self.mutex.unlock(); + defer self.safe.reset(); + @memcpy(self.copy[0..self.safe.len], self.safe.items[0..self.safe.len]); + return self.copy[0..self.safe.len]; + } + }; +} diff --git a/src/aio/common.zig b/src/minilib/item_pool.zig similarity index 54% rename from src/aio/common.zig rename to src/minilib/item_pool.zig index 588924c..7efd443 100644 --- a/src/aio/common.zig +++ b/src/minilib/item_pool.zig @@ -1,78 +1,10 @@ -const std = @import("std"); - -pub fn FixedArrayList(T: type, SZ: type) type { - return struct { - items: []T, - len: SZ = 0, - - pub const Error = error{OutOfMemory}; - - pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { - return .{ .items = try allocator.alloc(T, n) }; - } - - pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - allocator.free(self.items); - self.* = undefined; - } - - pub fn add(self: *@This(), item: T) Error!void { - if (self.len >= self.items.len) return error.OutOfMemory; - self.items[self.len] = item; - self.len += 1; - } - - pub fn reset(self: *@This()) void { - self.len = 0; - } - }; -} +//! Item pool that returns stable ids +//! Insertion and removal is O(1) +//! Iteration is O(n) -pub fn DoubleBufferedFixedArrayList(T: type, SZ: type) type { - return struct { - mutex: std.Thread.Mutex = .{}, - safe: FixedArrayList(T, SZ), - copy: []T align(std.atomic.cache_line), - - pub const Error = error{OutOfMemory}; - - pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() { - var safe = try FixedArrayList(T, SZ).init(allocator, n); - errdefer safe.deinit(allocator); - const copy = try allocator.alloc(T, n); - errdefer allocator.free(copy); - return .{ .safe = safe, .copy = copy }; - } - - pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - self.safe.deinit(allocator); - allocator.free(self.copy); - self.* = undefined; - } - - pub fn add(self: *@This(), item: T) Error!void { - self.mutex.lock(); - defer self.mutex.unlock(); - try self.safe.add(item); - } - - pub fn reset(self: *@This()) void { - self.mutex.lock(); - defer self.mutex.unlock(); - self.safe.reset(); - } - - pub fn swap(self: *@This()) []const T { - self.mutex.lock(); - defer self.mutex.unlock(); - defer self.safe.reset(); - @memcpy(self.copy[0..self.safe.len], self.safe.items[0..self.safe.len]); - return self.copy[0..self.safe.len]; - } - }; -} +const std = @import("std"); -pub fn Pool(T: type, SZ: type) type { +pub fn ItemPool(T: type, SZ: type) type { return struct { pub const Node = union(enum) { free: ?SZ, used: T }; nodes: []Node,