Skip to content

Commit

Permalink
minilib: DynamicThreadPool normal init
Browse files Browse the repository at this point in the history
Since the threads are now started on demand and not in the start()
function the init can return the struct.
  • Loading branch information
Cloudef committed Jun 29, 2024
1 parent c2a3f77 commit 17bbd33
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 37 deletions.
3 changes: 1 addition & 2 deletions examples/coro_wttr.zig
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ pub fn main() !void {
var completed: u32 = 0;
const ltask = try scheduler.spawn(loader, .{ &completed, &max }, .{});

var tpool: coro.ThreadPool = .{};
try tpool.start(gpa.allocator(), .{});
var tpool: coro.ThreadPool = try coro.ThreadPool.init(gpa.allocator(), .{});
defer tpool.deinit();

var tasks = std.ArrayList(coro.Task.Generic(anyerror![]const u8)).init(allocator);
Expand Down
14 changes: 4 additions & 10 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ link_lock: std.DynamicBitSetUnmanaged, // operation is waiting for linked operat
pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll)
started: std.DynamicBitSetUnmanaged, // operation has been queued, it's being performed if pending is false
pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup
tpool: *DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here
kludge_tpool: *DynamicThreadPool, // thread pool for performing operations which can't be polled for readiness
tpool: DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here
kludge_tpool: DynamicThreadPool, // thread pool for performing operations which can't be polled for readiness
source: EventSource, // when threads finish, they signal it using this event source
finished: DoubleBufferedFixedArrayList(Result, u16), // operations that are finished, double buffered to be thread safe

Expand All @@ -61,16 +61,12 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
errdefer started.deinit(allocator);
var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1);
errdefer pfd.deinit(allocator);
var tpool = try allocator.create(DynamicThreadPool);
errdefer allocator.destroy(tpool);
tpool.init(allocator, .{ .max_threads = aio.options.max_threads }) catch |err| return switch (err) {
var tpool = DynamicThreadPool.init(allocator, .{ .max_threads = aio.options.max_threads }) catch |err| return switch (err) {
error.TimerUnsupported => error.SystemOutdated,
else => |e| e,
};
errdefer tpool.deinit();
var kludge_tpool = try allocator.create(DynamicThreadPool);
errdefer allocator.destroy(kludge_tpool);
kludge_tpool.init(allocator, .{ .max_threads = aio.options.fallback_max_kludge_threads }) catch |err| return switch (err) {
var kludge_tpool = DynamicThreadPool.init(allocator, .{ .max_threads = aio.options.fallback_max_kludge_threads }) catch |err| return switch (err) {
error.TimerUnsupported => error.SystemOutdated,
else => |e| e,
};
Expand All @@ -96,9 +92,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {

pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
self.tpool.deinit();
allocator.destroy(self.tpool);
self.kludge_tpool.deinit();
allocator.destroy(self.kludge_tpool);
var iter = self.ops.iterator();
while (iter.next()) |e| uopUnwrapCall(e.v, posix.closeReadiness, .{self.readiness[e.k]});
self.ops.deinit(allocator);
Expand Down
25 changes: 13 additions & 12 deletions src/coro/ThreadPool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ const DynamicThreadPool = @import("minilib").DynamicThreadPool;
const ReturnType = @import("minilib").ReturnType;
const ReturnTypeMixedWithErrorSet = @import("minilib").ReturnTypeMixedWithErrorSet;

pool: DynamicThreadPool = undefined,
source: aio.EventSource = undefined,
pool: DynamicThreadPool,
source: aio.EventSource,
num_tasks: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),

/// Spin up the pool, `allocator` is used to allocate the tasks
pub fn start(self: *@This(), allocator: std.mem.Allocator, options: DynamicThreadPool.Options) !void {
self.* = .{
.pool = undefined,
.source = try aio.EventSource.init(),
};
errdefer self.source.deinit();
try self.pool.init(allocator, options);
pub const InitError = DynamicThreadPool.InitError || aio.EventSource.Error;

/// `allocator` is used to allocate the tasks
pub fn init(allocator: std.mem.Allocator, options: DynamicThreadPool.Options) InitError!@This() {
var pool = try DynamicThreadPool.init(allocator, options);
errdefer pool.deinit();
var source = try aio.EventSource.init();
errdefer source.deinit();
return .{ .pool = pool, .source = source };
}

pub fn deinit(self: *@This()) void {
Expand Down Expand Up @@ -75,6 +76,7 @@ pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) Return
/// Spawn a new coroutine which will immediately call `yieldForCompletition` for later collection of the result
/// Normally one would use the `spawnForCompletition` method, but in case a generic functions return type can't be deduced, use this any variant.
pub fn spawnAnyForCompletition(self: *@This(), scheduler: *Scheduler, Result: type, func: anytype, args: anytype, opts: Scheduler.SpawnOptions) Scheduler.SpawnError!Task {
// TODO: optimize the stack size
return scheduler.spawnAny(Result, yieldForCompletition, .{ self, func, args }, opts);
}

Expand Down Expand Up @@ -119,8 +121,7 @@ test "ThreadPool" {
var scheduler = try Scheduler.init(std.testing.allocator, .{});
defer scheduler.deinit();

var pool: ThreadPool = .{};
try pool.start(std.testing.allocator, .{});
var pool: ThreadPool = try ThreadPool.init(std.testing.allocator, .{});
defer pool.deinit();

for (0..10) |_| _ = try scheduler.spawn(Test.task, .{&pool}, .{});
Expand Down
29 changes: 16 additions & 13 deletions src/minilib/DynamicThreadPool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ idling_threads: u32 = 0,
active_threads: u32 = 0,
timeout: u64,
// used to serialize the acquisition order
serial: std.DynamicBitSetUnmanaged align(std.atomic.cache_line) = undefined,
serial: std.DynamicBitSetUnmanaged align(std.atomic.cache_line),

const RunQueue = std.SinglyLinkedList(Runnable);
const Runnable = struct { runFn: RunProto };
Expand All @@ -35,24 +35,25 @@ pub const Options = struct {

pub const InitError = error{OutOfMemory} || std.time.Timer.Error;

pub fn init(self: *@This(), allocator: std.mem.Allocator, options: Options) InitError!void {
self.* = .{
.allocator = allocator,
.timeout = options.timeout,
};

pub fn init(allocator: std.mem.Allocator, options: Options) InitError!@This() {
if (builtin.single_threaded) {
return;
return .{ .allocator = undefined, .timeout = undefined, .serial = undefined };
}

_ = try std.time.Timer.start(); // check that we have a timer

const thread_count = options.max_threads orelse @max(1, std.Thread.getCpuCount() catch 1);
self.serial = try std.DynamicBitSetUnmanaged.initEmpty(allocator, thread_count);
errdefer self.serial.deinit(allocator);
self.threads = try allocator.alloc(DynamicThread, thread_count);
errdefer allocator.free(self.threads);
@memset(self.threads, .{});
var serial = try std.DynamicBitSetUnmanaged.initEmpty(allocator, thread_count);
errdefer serial.deinit(allocator);
const threads = try allocator.alloc(DynamicThread, thread_count);
errdefer allocator.free(threads);
@memset(threads, .{});
return .{
.allocator = allocator,
.timeout = options.timeout,
.serial = serial,
.threads = threads,
};
}

pub fn deinit(self: *@This()) void {
Expand Down Expand Up @@ -118,6 +119,8 @@ pub fn spawn(self: *@This(), comptime func: anytype, args: anytype) SpawnError!v
}
}

// TODO: Optimize closure allocations
// Closures are often same size, so they can be bucketed and reused
const closure = try self.allocator.create(Closure);
closure.* = .{ .arguments = args };
self.run_queue.prepend(&closure.run_node);
Expand Down

0 comments on commit 17bbd33

Please sign in to comment.