Skip to content

Commit

Permalink
aio/coro: move common utilities to minilib module
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jun 27, 2024
1 parent 2900a34 commit 3c66f5a
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 158 deletions.
11 changes: 10 additions & 1 deletion build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ pub fn build(b: *std.Build) void {
const fallback = b.option(bool, "fallback", "use fallback event loop") orelse false;
opts.addOption(bool, "fallback", fallback);

const minilib = b.addModule("minilib", .{
.root_source_file = b.path("src/minilib.zig"),
.target = target,
.optimize = optimize,
});

const aio = b.addModule("aio", .{
.root_source_file = b.path("src/aio.zig"),
.target = target,
.optimize = optimize,
.link_libc = target.query.os_tag == .windows,
});
aio.addImport("minilib", minilib);
aio.addImport("build_options", opts.createModule());

const coro = b.addModule("coro", .{
.root_source_file = b.path("src/coro.zig"),
.target = target,
.optimize = optimize,
});
coro.addImport("minilib", minilib);
coro.addImport("aio", aio);

const run_all = b.step("run", "Run all examples");
Expand Down Expand Up @@ -48,14 +56,15 @@ pub fn build(b: *std.Build) void {

const test_filter = b.option([]const u8, "test-filter", "Skip tests that do not match any filter") orelse "";
const test_step = b.step("test", "Run unit tests");
inline for (.{ .aio, .coro }) |mod| {
inline for (.{ .minilib, .aio, .coro }) |mod| {
const tst = b.addTest(.{
.root_source_file = b.path("src/" ++ @tagName(mod) ++ ".zig"),
.target = target,
.optimize = optimize,
.filters = &.{test_filter},
.link_libc = target.query.os_tag == .windows,
});
if (mod != .minilib) tst.root_module.addImport("minilib", minilib);
if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule());
if (mod == .coro) tst.root_module.addImport("aio", aio);
const run = b.addRunArtifact(tst);
Expand Down
2 changes: 1 addition & 1 deletion examples/coro_wttr.zig
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub fn main() !void {
const ltask = try scheduler.spawn(loader, .{ &completed, &max }, .{});

var tpool: coro.ThreadPool = .{};
try tpool.start(gpa.allocator(), 1);
try tpool.start(gpa.allocator(), .{});
defer tpool.deinit();

var tasks = std.ArrayList(coro.Task.Generic(anyerror![]const u8)).init(allocator);
Expand Down
28 changes: 19 additions & 9 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ const std = @import("std");
const aio = @import("../aio.zig");
const posix = @import("posix.zig");
const Operation = @import("ops.zig").Operation;
const Pool = @import("common.zig").Pool;
const FixedArrayList = @import("common.zig").FixedArrayList;
const DoubleBufferedFixedArrayList = @import("common.zig").DoubleBufferedFixedArrayList;
const ItemPool = @import("minilib").ItemPool;
const FixedArrayList = @import("minilib").FixedArrayList;
const DoubleBufferedFixedArrayList = @import("minilib").DoubleBufferedFixedArrayList;
const DynamicThreadPool = @import("minilib").DynamicThreadPool;

// This tries to emulate io_uring functionality.
// If something does not match how it works on io_uring on linux, it should be change to match.
Expand All @@ -14,19 +15,28 @@ const DoubleBufferedFixedArrayList = @import("common.zig").DoubleBufferedFixedAr
// However it might be still more pleasant experience than (e)poll/kqueueing away as the behaviour should be
// more or less consistent.

comptime {
if (builtin.single_threaded) {
@compileError(
\\Fallback backend requires building with threads as otherwise it may block the whole program.
\\To only target linux and io_uring, set `aio_options.fallback = .disable` in your root .zig file.
);
}
}

pub const EventSource = posix.EventSource;

const Result = struct { failure: Operation.Error, id: u16 };

ops: Pool(Operation.Union, u16),
ops: ItemPool(Operation.Union, u16),
prev_id: ?u16 = null, // for linking operations
next: []u16, // linked operation, points to self if none
readiness: []posix.Readiness, // readiness fd that gets polled before we perform the operation
link_lock: std.DynamicBitSetUnmanaged, // operation is waiting for linked operation to finish first
pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll)
started: std.DynamicBitSetUnmanaged, // operation has been queued, it's being performed if pending is false
pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup
tpool: *std.Thread.Pool, // thread pool for performing operations, not all operations will be performed here
tpool: *DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here
source: EventSource, // when threads finish, they signal it using this event source
finished: DoubleBufferedFixedArrayList(Result, u16), // operations that are finished, double buffered to be thread safe

Expand All @@ -53,7 +63,7 @@ fn minThreads() u32 {
}

pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
var ops = try Pool(Operation.Union, u16).init(allocator, n);
var ops = try ItemPool(Operation.Union, u16).init(allocator, n);
errdefer ops.deinit(allocator);
const next = try allocator.alloc(u16, n);
errdefer allocator.free(next);
Expand All @@ -67,11 +77,11 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
errdefer started.deinit(allocator);
var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1);
errdefer pfd.deinit(allocator);
var tpool = try allocator.create(std.Thread.Pool);
var tpool = try allocator.create(DynamicThreadPool);
errdefer allocator.destroy(tpool);
const thread_count: u32 = aio.options.num_threads orelse @intCast(@max(minThreads(), std.Thread.getCpuCount() catch 1));
tpool.init(.{ .allocator = allocator, .n_jobs = thread_count }) catch |err| return switch (err) {
error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources,
tpool.init(allocator, .{ .num_threads = thread_count }) catch |err| return switch (err) {
error.TimerUnsupported => error.SystemOutdated,
else => |e| e,
};
errdefer tpool.deinit();
Expand Down
8 changes: 4 additions & 4 deletions src/aio/IoUring.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const std = @import("std");
const aio = @import("../aio.zig");
const Operation = @import("ops.zig").Operation;
const Pool = @import("common.zig").Pool;
const ItemPool = @import("minilib").ItemPool;
const posix = @import("posix.zig");
const linux = @import("posix/linux.zig");
const log = std.log.scoped(.io_uring);
Expand Down Expand Up @@ -37,10 +37,10 @@ const Supported = struct {
// Could also make io_uring based event source with `IORING_OP_MSG_RING`
// However I've read some claims that passing messages to other ring has more
// latency than actually using eventfd. Eventfd is simple and reliable.
pub const EventSource = posix.EventSource;
pub const EventSource = linux.EventSource;

io: std.os.linux.IoUring,
ops: Pool(Operation.Union, u16),
ops: ItemPool(Operation.Union, u16),

pub inline fn isSupported(op_types: []const type) bool {
var ops: [op_types.len]std.os.linux.IORING_OP = undefined;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
const n2 = std.math.ceilPowerOfTwo(u16, n) catch unreachable;
var io = try uring_init(n2);
errdefer io.deinit();
const ops = try Pool(Operation.Union, u16).init(allocator, n2);
const ops = try ItemPool(Operation.Union, u16).init(allocator, n2);
errdefer ops.deinit(allocator);
return .{ .io = io, .ops = ops };
}
Expand Down
6 changes: 3 additions & 3 deletions src/coro/Frame.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ const std = @import("std");
const aio = @import("aio");
const Fiber = @import("zefi.zig");
const Scheduler = @import("Scheduler.zig");
const common = @import("common.zig");
const Link = @import("minilib").Link;
const log = std.log.scoped(.coro);

pub const List = std.DoublyLinkedList(common.Link(@This(), "link", .double));
pub const List = std.DoublyLinkedList(Link(@This(), "link", .double));
pub const stack_alignment = Fiber.stack_alignment;
pub const Stack = Fiber.Stack;

Expand All @@ -23,7 +23,7 @@ pub const Status = enum(u8) {
}
};

pub const WaitList = std.SinglyLinkedList(common.Link(@This(), "wait_link", .single));
pub const WaitList = std.SinglyLinkedList(Link(@This(), "wait_link", .single));

fiber: *Fiber,
stack: ?Fiber.Stack = null,
Expand Down
9 changes: 5 additions & 4 deletions src/coro/Scheduler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ const aio = @import("aio");
const io = @import("io.zig");
const Frame = @import("Frame.zig");
const Task = @import("Task.zig");
const common = @import("common.zig");
const ReturnType = @import("minilib").ReturnType;
const options = @import("../coro.zig").options;
const ReturnType = common.ReturnType;

allocator: std.mem.Allocator,
io: aio.Dynamic,
Expand Down Expand Up @@ -100,20 +99,22 @@ pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void {
}

fn ioQueue(uop: aio.Dynamic.Uop, id: aio.Id) void {
const OperationContext = @import("io.zig").OperationContext;
switch (uop) {
inline else => |*op| {
std.debug.assert(op.userdata != 0);
var ctx: *common.OperationContext = @ptrFromInt(op.userdata);
var ctx: *OperationContext = @ptrFromInt(op.userdata);
ctx.id = id;
},
}
}

fn ioCompletition(uop: aio.Dynamic.Uop, _: aio.Id, failed: bool) void {
const OperationContext = @import("io.zig").OperationContext;
switch (uop) {
inline else => |*op| {
std.debug.assert(op.userdata != 0);
var ctx: *common.OperationContext = @ptrFromInt(op.userdata);
var ctx: *OperationContext = @ptrFromInt(op.userdata);
var frame: *Frame = ctx.whole.frame;
std.debug.assert(ctx.whole.num_operations > 0);
ctx.completed = true;
Expand Down
3 changes: 1 addition & 2 deletions src/coro/Task.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const std = @import("std");
const aio = @import("aio");
const ReturnType = @import("common.zig").ReturnType;
const ReturnTypeWithError = @import("common.zig").ReturnTypeWithError;
const ReturnType = @import("minilib").ReturnType;
const Frame = @import("Frame.zig");

const Task = @This();
Expand Down
30 changes: 19 additions & 11 deletions src/coro/ThreadPool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@ const io = @import("io.zig");
const Scheduler = @import("Scheduler.zig");
const Task = @import("Task.zig");
const Frame = @import("Frame.zig");
const ReturnTypeWithError = @import("common.zig").ReturnTypeWithError;
const ReturnType = @import("common.zig").ReturnType;
const DynamicThreadPool = @import("minilib").DynamicThreadPool;
const ReturnType = @import("minilib").ReturnType;
const ReturnTypeMixedWithErrorSet = @import("minilib").ReturnTypeMixedWithErrorSet;

pool: std.Thread.Pool = undefined,
pool: DynamicThreadPool = undefined,
source: aio.EventSource = undefined,
num_tasks: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),

/// Spin up the pool, `allocator` is used to allocate the tasks
/// If `num_jobs` is zero, the thread count for the current CPU is used
pub fn start(self: *@This(), allocator: std.mem.Allocator, num_jobs: u32) !void {
self.* = .{ .pool = .{ .allocator = undefined, .threads = undefined }, .source = try aio.EventSource.init() };
/// If `num_threads` is null, the thread count for the current CPU is used
pub fn start(self: *@This(), allocator: std.mem.Allocator, options: DynamicThreadPool.Options) !void {
self.* = .{
.pool = .{
.allocator = undefined,
.threads = undefined,
.timeout = undefined,
},
.source = try aio.EventSource.init(),
};
errdefer self.source.deinit();
try self.pool.init(.{ .allocator = allocator, .n_jobs = if (num_jobs == 0) null else num_jobs });
try self.pool.init(allocator, options);
}

pub fn deinit(self: *@This()) void {
Expand All @@ -42,7 +50,7 @@ inline fn entrypoint(self: *@This(), completed: *bool, token: *CancellationToken
}

/// Yield until `func` finishes on another thread
pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) ReturnTypeWithError(func, std.Thread.SpawnError) {
pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype) ReturnTypeMixedWithErrorSet(func, std.Thread.SpawnError) {
var completed: bool = false;
var res: ReturnType(func) = undefined;
_ = self.num_tasks.fetchAdd(1, .monotonic);
Expand Down Expand Up @@ -77,12 +85,12 @@ pub fn spawnAnyForCompletition(self: *@This(), scheduler: *Scheduler, Result: ty

/// Helper for getting the Task.Generic when using spawnForCompletition tasks.
pub fn Generic2(comptime func: anytype) type {
return Task.Generic(ReturnTypeWithError(func, std.Thread.SpawnError));
return Task.Generic(ReturnTypeMixedWithErrorSet(func, std.Thread.SpawnError));
}

/// Spawn a new coroutine which will immediately call `yieldForCompletition` for later collection of the result
pub fn spawnForCompletition(self: *@This(), scheduler: *Scheduler, func: anytype, args: anytype, opts: Scheduler.SpawnOptions) Scheduler.SpawnError!Generic2(func) {
const Result = ReturnTypeWithError(func, std.Thread.SpawnError);
const Result = ReturnTypeMixedWithErrorSet(func, std.Thread.SpawnError);
const task = try self.spawnAnyForCompletition(scheduler, Result, func, args, opts);
return task.generic(Result);
}
Expand Down Expand Up @@ -117,7 +125,7 @@ test "ThreadPool" {
defer scheduler.deinit();

var pool: ThreadPool = .{};
try pool.start(std.testing.allocator, 0);
try pool.start(std.testing.allocator, .{});
defer pool.deinit();

for (0..10) |_| _ = try scheduler.spawn(Test.task, .{&pool}, .{});
Expand Down
47 changes: 0 additions & 47 deletions src/coro/common.zig

This file was deleted.

17 changes: 14 additions & 3 deletions src/coro/io.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@ const std = @import("std");
const aio = @import("aio");
const Scheduler = @import("Scheduler.zig");
const Frame = @import("Frame.zig");
const common = @import("common.zig");

pub const WholeContext = struct {
num_operations: u16,
num_errors: u16 = 0,
frame: *Frame,
};

pub const OperationContext = struct {
whole: *WholeContext,
id: ?aio.Id = null,
completed: bool = false,
};

pub const Error = aio.Error || error{Canceled};

Expand All @@ -12,8 +23,8 @@ pub fn do(operations: anytype, status: Frame.Status) Error!u16 {
if (frame.canceled) return error.Canceled;

var work = struct { ops: @TypeOf(operations) }{ .ops = operations };
var whole: common.WholeContext = .{ .num_operations = operations.len, .frame = frame };
var ctx_list: [operations.len]common.OperationContext = undefined;
var whole: WholeContext = .{ .num_operations = operations.len, .frame = frame };
var ctx_list: [operations.len]OperationContext = undefined;

inline for (&work.ops, &ctx_list) |*op, *ctx| {
ctx.* = .{ .whole = &whole };
Expand Down
Loading

0 comments on commit 3c66f5a

Please sign in to comment.