From 17bbd33865627f6f7982fbff9198948793ee9158 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Sat, 29 Jun 2024 11:11:46 +0900 Subject: [PATCH] minilib: DynamicThreadPool normal init Since the threads are now started on demand and not in the start() function the init can return the struct. --- examples/coro_wttr.zig | 3 +-- src/aio/Fallback.zig | 14 ++++---------- src/coro/ThreadPool.zig | 25 +++++++++++++------------ src/minilib/DynamicThreadPool.zig | 29 ++++++++++++++++------------- 4 files changed, 34 insertions(+), 37 deletions(-) diff --git a/examples/coro_wttr.zig b/examples/coro_wttr.zig index 595abb8..38002c2 100644 --- a/examples/coro_wttr.zig +++ b/examples/coro_wttr.zig @@ -79,8 +79,7 @@ pub fn main() !void { var completed: u32 = 0; const ltask = try scheduler.spawn(loader, .{ &completed, &max }, .{}); - var tpool: coro.ThreadPool = .{}; - try tpool.start(gpa.allocator(), .{}); + var tpool: coro.ThreadPool = try coro.ThreadPool.init(gpa.allocator(), .{}); defer tpool.deinit(); var tasks = std.ArrayList(coro.Task.Generic(anyerror![]const u8)).init(allocator); diff --git a/src/aio/Fallback.zig b/src/aio/Fallback.zig index 1209561..b30d6a6 100644 --- a/src/aio/Fallback.zig +++ b/src/aio/Fallback.zig @@ -37,8 +37,8 @@ link_lock: std.DynamicBitSetUnmanaged, // operation is waiting for linked operat pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll) started: std.DynamicBitSetUnmanaged, // operation has been queued, it's being performed if pending is false pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup -tpool: *DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here -kludge_tpool: *DynamicThreadPool, // thread pool for performing operations which can't be polled for readiness +tpool: DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here +kludge_tpool: DynamicThreadPool, // thread pool for performing operations which can't be polled for readiness source: EventSource, // when threads finish, they signal it using this event source finished: DoubleBufferedFixedArrayList(Result, u16), // operations that are finished, double buffered to be thread safe @@ -61,16 +61,12 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { errdefer started.deinit(allocator); var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1); errdefer pfd.deinit(allocator); - var tpool = try allocator.create(DynamicThreadPool); - errdefer allocator.destroy(tpool); - tpool.init(allocator, .{ .max_threads = aio.options.max_threads }) catch |err| return switch (err) { + var tpool = DynamicThreadPool.init(allocator, .{ .max_threads = aio.options.max_threads }) catch |err| return switch (err) { error.TimerUnsupported => error.SystemOutdated, else => |e| e, }; errdefer tpool.deinit(); - var kludge_tpool = try allocator.create(DynamicThreadPool); - errdefer allocator.destroy(kludge_tpool); - kludge_tpool.init(allocator, .{ .max_threads = aio.options.fallback_max_kludge_threads }) catch |err| return switch (err) { + var kludge_tpool = DynamicThreadPool.init(allocator, .{ .max_threads = aio.options.fallback_max_kludge_threads }) catch |err| return switch (err) { error.TimerUnsupported => error.SystemOutdated, else => |e| e, }; @@ -96,9 +92,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { self.tpool.deinit(); - allocator.destroy(self.tpool); self.kludge_tpool.deinit(); - allocator.destroy(self.kludge_tpool); var iter = self.ops.iterator(); while (iter.next()) |e| uopUnwrapCall(e.v, posix.closeReadiness, .{self.readiness[e.k]}); self.ops.deinit(allocator); diff --git a/src/coro/ThreadPool.zig b/src/coro/ThreadPool.zig index 3b66352..498fd78 100644 --- a/src/coro/ThreadPool.zig +++ b/src/coro/ThreadPool.zig @@ -8,18 +8,19 @@ const DynamicThreadPool = @import("minilib").DynamicThreadPool; const ReturnType = @import("minilib").ReturnType; const ReturnTypeMixedWithErrorSet = @import("minilib").ReturnTypeMixedWithErrorSet; -pool: DynamicThreadPool = undefined, -source: aio.EventSource = undefined, +pool: DynamicThreadPool, +source: aio.EventSource, num_tasks: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), -/// Spin up the pool, `allocator` is used to allocate the tasks -pub fn start(self: *@This(), allocator: std.mem.Allocator, options: DynamicThreadPool.Options) !void { - self.* = .{ - .pool = undefined, - .source = try aio.EventSource.init(), - }; - errdefer self.source.deinit(); - try self.pool.init(allocator, options); +pub const InitError = DynamicThreadPool.InitError || aio.EventSource.Error; + +/// `allocator` is used to allocate the tasks +pub fn init(allocator: std.mem.Allocator, options: DynamicThreadPool.Options) InitError!@This() { + var pool = try DynamicThreadPool.init(allocator, options); + errdefer pool.deinit(); + var source = try aio.EventSource.init(); + errdefer source.deinit(); + return .{ .pool = pool, .source = source }; } pub fn deinit(self: *@This()) void { @@ -75,6 +76,7 @@ pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) Return /// Spawn a new coroutine which will immediately call `yieldForCompletition` for later collection of the result /// Normally one would use the `spawnForCompletition` method, but in case a generic functions return type can't be deduced, use this any variant. pub fn spawnAnyForCompletition(self: *@This(), scheduler: *Scheduler, Result: type, func: anytype, args: anytype, opts: Scheduler.SpawnOptions) Scheduler.SpawnError!Task { + // TODO: optimize the stack size return scheduler.spawnAny(Result, yieldForCompletition, .{ self, func, args }, opts); } @@ -119,8 +121,7 @@ test "ThreadPool" { var scheduler = try Scheduler.init(std.testing.allocator, .{}); defer scheduler.deinit(); - var pool: ThreadPool = .{}; - try pool.start(std.testing.allocator, .{}); + var pool: ThreadPool = try ThreadPool.init(std.testing.allocator, .{}); defer pool.deinit(); for (0..10) |_| _ = try scheduler.spawn(Test.task, .{&pool}, .{}); diff --git a/src/minilib/DynamicThreadPool.zig b/src/minilib/DynamicThreadPool.zig index 62b4350..2508216 100644 --- a/src/minilib/DynamicThreadPool.zig +++ b/src/minilib/DynamicThreadPool.zig @@ -20,7 +20,7 @@ idling_threads: u32 = 0, active_threads: u32 = 0, timeout: u64, // used to serialize the acquisition order -serial: std.DynamicBitSetUnmanaged align(std.atomic.cache_line) = undefined, +serial: std.DynamicBitSetUnmanaged align(std.atomic.cache_line), const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: RunProto }; @@ -35,24 +35,25 @@ pub const Options = struct { pub const InitError = error{OutOfMemory} || std.time.Timer.Error; -pub fn init(self: *@This(), allocator: std.mem.Allocator, options: Options) InitError!void { - self.* = .{ - .allocator = allocator, - .timeout = options.timeout, - }; - +pub fn init(allocator: std.mem.Allocator, options: Options) InitError!@This() { if (builtin.single_threaded) { - return; + return .{ .allocator = undefined, .timeout = undefined, .serial = undefined }; } _ = try std.time.Timer.start(); // check that we have a timer const thread_count = options.max_threads orelse @max(1, std.Thread.getCpuCount() catch 1); - self.serial = try std.DynamicBitSetUnmanaged.initEmpty(allocator, thread_count); - errdefer self.serial.deinit(allocator); - self.threads = try allocator.alloc(DynamicThread, thread_count); - errdefer allocator.free(self.threads); - @memset(self.threads, .{}); + var serial = try std.DynamicBitSetUnmanaged.initEmpty(allocator, thread_count); + errdefer serial.deinit(allocator); + const threads = try allocator.alloc(DynamicThread, thread_count); + errdefer allocator.free(threads); + @memset(threads, .{}); + return .{ + .allocator = allocator, + .timeout = options.timeout, + .serial = serial, + .threads = threads, + }; } pub fn deinit(self: *@This()) void { @@ -118,6 +119,8 @@ pub fn spawn(self: *@This(), comptime func: anytype, args: anytype) SpawnError!v } } + // TODO: Optimize closure allocations + // Closures are often same size, so they can be bucketed and reused const closure = try self.allocator.create(Closure); closure.* = .{ .arguments = args }; self.run_queue.prepend(&closure.run_node);