Skip to content

Commit

Permalink
coro: take the coro api to different direction
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jun 24, 2024
1 parent 056c7b3 commit 58ab058
Show file tree
Hide file tree
Showing 16 changed files with 701 additions and 609 deletions.
34 changes: 8 additions & 26 deletions docs/pages/coro-context-switches.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,20 @@

To yield running task to the caller use the following.
The function takes a enum value as a argument representing the yield state of the task.
Enum value that corresponds to the integer `0` is resevered to indicate non yield state.

```zig
coro.yield(SomeEnum.value);
```

To continue running the task from where it left, you need to issue the same enum value to the following function.
If you specify the third parameter as `.wait`, the current task will yield (or block if not a task) until the
target task yields to that particural state. If instead third parameter is `.no_wait` then if the task currently
isn't being yielded in the supplied state, the call is a no-op.
The current yield state of a task can be checked with `state` method.
To wakeup a task use the `wakeup` method. When task is woken up the yield state is reset to `0`.
Calling `wakeup` when the task isn't yielded by application's yield state is a error.

```zig
coro.wakeupFromState(task, SomeEnum.value, .wait);
```

This is the preferred way to handle the control flow between tasks.

## Canceling IO

While it's possible to cancel IO by using the `aio.Cancel` operations. It is also possible to cancel
all IO operations currently blocking a task by doing the following.
If the task currently isn't being yielded by IO then the call is no-op.
Example of checking the current yield state and then waking up the task.

```zig
coro.wakeupFromIo(task);
switch (task.state(SomeEnum)) {
value => task.wakeup(),
}
```

## Unpaired wakeup

Sometimes it's useful to be able to wakeup the task from any yielding state.

```zig
coro.wakeup(task);
```

In this case the task will wake up no matter what its yielding state is currently.
3 changes: 1 addition & 2 deletions docs/pages/coro-io.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ outside a task then the call would be equal to calling the equal function from t
Use `aio.Cancel` operation to cancel the currently running operations in a task.
The `out_error` of such operation will then be set as `error.OperationCanceled`.

Alternatively it's possible to call `scheduler.wakeup(task);` or `scheduler.wakeupFromIo(task)`
which also cancels all currently running IO on that task.
Alternatively it's possible to call `task.complete(.cancel);` to actively cancel a task and collect its partial result.
27 changes: 14 additions & 13 deletions docs/pages/coro-scheduler.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,40 @@ yields or performs a IO operation using one of the `coro.io` namespace functions
var task = try scheduler.spawn(entrypoint, .{ 1, "args" }, .{});
```

### Reaping tasks
### Collecting result from a task

Following removes a task, freeing its memory and canceling all running IO operations for that task.
The reap may be delayed in case the task is currently doing IO, the IO operations will be actively canceled.
Use the following to collect a result of a task.
After collecting the result, the task handle is no longer valid.

```zig
scheduler.reap(task);
const res = task.collect(.wait);
```

Alternatively reap all the tasks using the following.
The reap may be delayed in case the tasks are currently doing IO, the IO operations will be actively canceled.
To cancel and collect partial result.

```zig
scheduler.reapAll();
const res = task.collect(.cancel);
```

Call to `deinit` also reaps all tasks.

### Running

The scheduler can process tasks and IO one step a time with the tick method.
By running tick the scheduler will reap tasks that returned (dead tasks) and context switch back to the
tasks in case they completed their IO operations.

```zig
// if there are pending IO operations, blocks until at least one completes
// If there are pending IO operations, blocks until at least one completes
try scheduler.tick(.blocking);
// returns immediately regardless of the current IO state
// Returns immediately regardless of the current IO state
try scheduler.tick(.nonblocking);
```

To run the scheduler until all tasks have returned aka died, then use the following.
To run the scheduler until all tasks have completed use the following.
The `mode` option can let you decide whether to wait for all tasks to finish, or to actively try cancel them.

```zig
try scheduler.run();
// Wait the tasks forever
try scheduler.run(.wait);
// Actively cancel the tasks
try scheduler.run(.cancel);
```
19 changes: 8 additions & 11 deletions examples/coro.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ pub const std_options: std.Options = .{
.log_level = .debug,
};

const Yield = enum {
server_ready,
};

fn server(client_task: coro.Task) !void {
fn server(lock: *coro.ResetEvent) !void {
var socket: std.posix.socket_t = undefined;
try coro.io.single(aio.Socket{
.domain = std.posix.AF.INET,
Expand All @@ -36,7 +32,7 @@ fn server(client_task: coro.Task) !void {
try std.posix.bind(socket, &address.any, address.getOsSockLen());
try std.posix.listen(socket, 128);

coro.wakeupFromState(client_task, Yield.server_ready, .wait);
lock.set();

var client_sock: std.posix.socket_t = undefined;
try coro.io.single(aio.Accept{ .socket = socket, .out_socket = &client_sock });
Expand All @@ -58,7 +54,7 @@ fn server(client_task: coro.Task) !void {
});
}

fn client() !void {
fn client(lock: *coro.ResetEvent) !void {
var socket: std.posix.socket_t = undefined;
try coro.io.single(aio.Socket{
.domain = std.posix.AF.INET,
Expand All @@ -67,7 +63,7 @@ fn client() !void {
.out_socket = &socket,
});

coro.yield(Yield.server_ready);
lock.wait();

const address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 1327);
try coro.io.single(aio.Connect{
Expand Down Expand Up @@ -99,7 +95,8 @@ pub fn main() !void {
defer _ = gpa.deinit();
var scheduler = try coro.Scheduler.init(gpa.allocator(), .{});
defer scheduler.deinit();
const client_task = try scheduler.spawn(client, .{}, .{});
_ = try scheduler.spawn(server, .{client_task}, .{});
try scheduler.run();
var lock: coro.ResetEvent = .{};
_ = try scheduler.spawn(client, .{&lock}, .{});
_ = try scheduler.spawn(server, .{&lock}, .{});
try scheduler.run(.wait);
}
12 changes: 5 additions & 7 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub const CompletionResult = struct {
/// Queue operations dynamically and complete them on demand
pub const Dynamic = struct {
pub const Uop = ops.Operation.Union;
pub const Callback = *const fn (uop: Uop) void;
pub const Callback = *const fn (uop: Uop, id: Id, failed: bool) void;
io: IO,
/// Used by coro implementation
callback: ?Callback = null,
Expand Down Expand Up @@ -135,11 +135,8 @@ pub inline fn multi(operations: anytype) (Error || error{SomeOperationFailed})!v
pub inline fn single(operation: anytype) (Error || @TypeOf(operation).Error)!void {
var op: @TypeOf(operation) = operation;
var err: @TypeOf(operation).Error = error.Success;
if (@hasField(@TypeOf(op), "out_error")) {
op.out_error = &err;
}
_ = try complete(.{op});
if (err != error.Success) return err;
if (@hasField(@TypeOf(op), "out_error")) op.out_error = &err;
if (try complete(.{op}) > 0) return err;
}

/// Checks if the current backend supports the operations
Expand Down Expand Up @@ -231,9 +228,10 @@ test "Nop" {
defer dynamic.deinit(std.testing.allocator);
try dynamic.queue(Nop{ .domain = @enumFromInt(255), .ident = 69, .userdata = 42 });
const Lel = struct {
fn cb(uop: Dynamic.Uop) void {
fn cb(uop: Dynamic.Uop, _: Id, failed: bool) void {
switch (uop) {
.nop => |*op| {
std.debug.assert(!failed);
std.debug.assert(255 == @intFromEnum(op.domain));
std.debug.assert(69 == op.ident);
std.debug.assert(42 == op.userdata);
Expand Down
21 changes: 9 additions & 12 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
}

fn initOp(op: anytype, id: u16) void {
if (comptime @hasField(@TypeOf(op.*), "out_id")) {
if (op.out_id) |p_id| p_id.* = @enumFromInt(id);
}
if (comptime @hasField(@TypeOf(op.*), "out_error")) {
if (op.out_error) |out_error| out_error.* = error.Success;
}
if (op.out_id) |p_id| p_id.* = @enumFromInt(id);
if (op.out_error) |out_error| out_error.* = error.Success;
}

fn addOp(self: *@This(), uop: Operation.Union, linked_to: ?u16, readiness: posix.Readiness) !u16 {
Expand Down Expand Up @@ -334,10 +330,7 @@ fn submit(self: *@This()) !bool {
}

fn completition(op: anytype, self: *@This(), res: Result) void {
if (comptime @hasField(@TypeOf(op.*), "out_error")) {
if (op.out_error) |err| err.* = @errorCast(res.failure);
}

if (op.out_error) |err| err.* = @errorCast(res.failure);
if (op.link != .unlinked and self.next[res.id] != res.id) {
if (self.ops.nodes[self.next[res.id]].used == .link_timeout) {
switch (op.link) {
Expand Down Expand Up @@ -369,14 +362,18 @@ fn handleFinished(self: *@This(), cb: ?aio.Dynamic.Callback) aio.CompletionResul
} else {
debug("complete: {}: {} [OK]", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used) });
}
defer self.removeOp(res.id);

if (self.ops.nodes[res.id].used == .link_timeout and res.failure == error.OperationCanceled) {
// special case
} else {
num_errors += @intFromBool(res.failure != error.Success);
}

uopUnwrapCall(&self.ops.nodes[res.id].used, completition, .{ self, res });
if (cb) |f| f(self.ops.nodes[res.id].used);

const uop = self.ops.nodes[res.id].used;
self.removeOp(res.id);
if (cb) |f| f(uop, @enumFromInt(res.id), res.failure != error.Success);
}

return .{ .num_completed = self.finished_copy.len, .num_errors = num_errors };
Expand Down
52 changes: 33 additions & 19 deletions src/aio/IoUring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,32 @@ pub const NOP = std.math.maxInt(u64);

pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynamic.Callback) aio.Error!aio.CompletionResult {
if (self.ops.empty()) return .{};

if (mode == .nonblocking) {
_ = self.io.nop(NOP) catch |err| return switch (err) {
error.SubmissionQueueFull => .{},
};
}

_ = try uring_submit(&self.io);

var result: aio.CompletionResult = .{};
var cqes: [aio.options.io_uring_cqe_sz]std.os.linux.io_uring_cqe = undefined;
const n = try uring_copy_cqes(&self.io, &cqes, 1);
for (cqes[0..n]) |*cqe| {
if (cqe.user_data == NOP) continue;
defer self.ops.remove(@intCast(cqe.user_data));
const uop = self.ops.get(@intCast(cqe.user_data));
switch (uop.*) {
const uop = self.ops.get(@intCast(cqe.user_data)).*;
var failed: bool = false;
switch (uop) {
inline else => |*op| uring_handle_completion(op, cqe) catch {
result.num_errors += 1;
failed = true;
},
}
if (cb) |f| f(uop.*);
self.ops.remove(@intCast(cqe.user_data));
if (cb) |f| f(uop, @enumFromInt(cqe.user_data), failed);
}

result.num_completed = n - @intFromBool(mode == .nonblocking);
return result;
}
Expand Down Expand Up @@ -271,12 +277,8 @@ inline fn uring_queue(io: *std.os.linux.IoUring, op: anytype, user_data: u64) ai
.soft => sqe.flags |= std.os.linux.IOSQE_IO_LINK,
.hard => sqe.flags |= std.os.linux.IOSQE_IO_HARDLINK,
}
if (comptime @hasField(@TypeOf(op.*), "out_id")) {
if (op.out_id) |id| id.* = @enumFromInt(user_data);
}
if (comptime @hasField(@TypeOf(op.*), "out_error")) {
if (op.out_error) |out_error| out_error.* = error.Success;
}
if (op.out_id) |id| id.* = @enumFromInt(user_data);
if (op.out_error) |out_error| out_error.* = error.Success;
}

inline fn uring_submit(io: *std.os.linux.IoUring) aio.Error!u16 {
Expand Down Expand Up @@ -320,7 +322,10 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
const err = cqe.err();
if (err != .SUCCESS) {
const res: @TypeOf(op.*).Error = switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) {
.nop => unreachable,
.nop => switch (err) {
.CANCELED => error.OperationCanceled,
else => std.posix.unexpectedErrno(err),
},
.fsync => switch (err) {
.SUCCESS, .INTR, .INVAL, .FAULT, .AGAIN, .ROFS => unreachable,
.BADF => unreachable, // not a file
Expand Down Expand Up @@ -454,25 +459,32 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
.ILSEQ => error.InvalidUtf8,
else => std.posix.unexpectedErrno(err),
},
.close_file, .close_dir, .close_socket => unreachable,
.notify_event_source, .wait_event_source, .close_event_source => unreachable,
.close_file, .close_dir, .close_socket => switch (err) {
.CANCELED => error.OperationCanceled,
else => std.posix.unexpectedErrno(err),
},
.notify_event_source, .wait_event_source, .close_event_source => switch (err) {
.CANCELED => error.OperationCanceled,
else => std.posix.unexpectedErrno(err),
},
.timeout => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.TIME => error.Success,
.CANCELED => error.OperationCanceled,
else => unreachable,
else => std.posix.unexpectedErrno(err),
},
.link_timeout => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.TIME => error.Expired,
.ALREADY, .CANCELED => error.OperationCanceled,
else => unreachable,
else => std.posix.unexpectedErrno(err),
},
.cancel => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.ALREADY => error.InProgress,
.NOENT => error.NotFound,
else => unreachable,
.CANCELED => error.OperationCanceled,
else => std.posix.unexpectedErrno(err),
},
.rename_at => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
Expand Down Expand Up @@ -519,6 +531,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
},
.mkdir_at => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN => unreachable,
.CANCELED => error.OperationCanceled,
.ACCES => error.AccessDenied,
.BADF => unreachable,
.PERM => error.AccessDenied,
Expand All @@ -539,6 +552,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
},
.symlink_at => switch (err) {
.SUCCESS, .INTR, .INVAL, .AGAIN, .FAULT => unreachable,
.CANCELED => error.OperationCanceled,
.ACCES => error.AccessDenied,
.PERM => error.AccessDenied,
.DQUOT => error.DiskQuota,
Expand All @@ -555,11 +569,13 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
},
.child_exit => switch (err) {
.SUCCESS, .INTR, .AGAIN, .FAULT, .INVAL => unreachable,
.CANCELED => error.OperationCanceled,
.CHILD => error.NotFound,
else => std.posix.unexpectedErrno(err),
},
.socket => switch (err) {
.SUCCESS, .INTR, .AGAIN, .FAULT => unreachable,
.CANCELED => error.OperationCanceled,
.ACCES => error.PermissionDenied,
.AFNOSUPPORT => error.AddressFamilyNotSupported,
.INVAL => error.ProtocolFamilyNotAvailable,
Expand All @@ -573,9 +589,7 @@ inline fn uring_handle_completion(op: anytype, cqe: *std.os.linux.io_uring_cqe)
},
};

if (comptime @hasField(@TypeOf(op.*), "out_error")) {
if (op.out_error) |out_error| out_error.* = res;
}
if (op.out_error) |out_error| out_error.* = res;

if (res != error.Success) {
if ((comptime Operation.tagFromPayloadType(@TypeOf(op.*)) == .link_timeout) and res == error.OperationCanceled) {
Expand Down
Loading

0 comments on commit 58ab058

Please sign in to comment.