Skip to content

Commit

Permalink
coro: lower operations to uops in coro.do
Browse files Browse the repository at this point in the history
Fixes #33
  • Loading branch information
Cloudef committed Nov 13, 2024
1 parent 7f49eac commit fec83b4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
27 changes: 9 additions & 18 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub const CompletionResult = struct {

/// Queue operations dynamically and complete them on demand
pub const Dynamic = struct {
pub const Uop = ops.Operation.Union;
pub const Uop = Operation.Union;
pub const QueueCallback = *const fn (uop: Uop, id: Id) void;
pub const CompletionCallback = *const fn (uop: Uop, id: Id, failed: bool) void;

Expand All @@ -85,18 +85,13 @@ pub const Dynamic = struct {
/// The call is atomic, if any of the operations fail to queue, then the given operations are reverted
pub inline fn queue(self: *@This(), operations: anytype) Error!void {
const ti = @typeInfo(@TypeOf(operations));
if (comptime ti == .Struct and ti.Struct.is_tuple) {
if (comptime (ti == .Struct and ti.Struct.is_tuple) or ti == .Array) {
if (comptime operations.len == 0) @compileError("no work to be done");
var uops: [operations.len]ops.Operation.Union = undefined;
inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op);
return self.io.queue(operations.len, &uops, self.queue_callback);
} else if (comptime ti == .Array) {
if (comptime operations.len == 0) @compileError("no work to be done");
var uops: [operations.len]ops.Operation.Union = undefined;
inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op);
var uops: [operations.len]Operation.Union = undefined;
inline for (operations, &uops) |op, *uop| uop.* = Operation.uopFromOp(op);
return self.io.queue(operations.len, &uops, self.queue_callback);
} else {
var uops: [1]ops.Operation.Union = .{ops.Operation.uopFromOp(operations)};
var uops: [1]Operation.Union = .{Operation.uopFromOp(operations)};
return self.io.queue(1, &uops, self.queue_callback);
}
}
Expand Down Expand Up @@ -132,15 +127,10 @@ pub const Dynamic = struct {
/// Returns the number of errors occured, 0 if there were no errors
pub inline fn complete(operations: anytype) Error!u16 {
const ti = @typeInfo(@TypeOf(operations));
if (comptime ti == .Struct and ti.Struct.is_tuple) {
if (comptime operations.len == 0) @compileError("no work to be done");
var uops: [operations.len]ops.Operation.Union = undefined;
inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op);
return IO.immediate(operations.len, &uops);
} else if (comptime ti == .Array) {
if (comptime (ti == .Struct and ti.Struct.is_tuple) or ti == .Array) {
if (comptime operations.len == 0) @compileError("no work to be done");
var uops: [operations.len]ops.Operation.Union = undefined;
inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op);
var uops: [operations.len]Operation.Union = undefined;
inline for (operations, &uops) |op, *uop| uop.* = Operation.uopFromOp(op);
return IO.immediate(operations.len, &uops);
} else {
@compileError("expected a tuple or array of operations");
Expand Down Expand Up @@ -203,6 +193,7 @@ const IO = switch (builtin.target.os.tag) {
};

const ops = @import("aio/ops.zig");
pub const Operation = ops.Operation;
pub const Id = ops.Id;
pub const Nop = ops.Nop;
pub const Fsync = ops.Fsync;
Expand Down
27 changes: 19 additions & 8 deletions src/coro/io.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,30 @@ pub fn do(operations: anytype, status: Frame.Status) Error!u16 {
if (Frame.current()) |frame| {
if (frame.canceled) return error.Canceled;

var work = struct { ops: @TypeOf(operations) }{ .ops = operations };
var whole: WholeContext = .{ .num_operations = operations.len, .frame = frame };
var ctx_list: [operations.len]OperationContext = undefined;

inline for (&work.ops, &ctx_list) |*op, *ctx| {
ctx.* = .{ .whole = &whole };
op.userdata = @intFromPtr(ctx);
const ti = @typeInfo(@TypeOf(operations));
if (comptime (ti == .Struct and ti.Struct.is_tuple) or ti == .Array) {
if (comptime operations.len == 0) @compileError("no work to be done");
var uops: [operations.len]aio.Dynamic.Uop = undefined;
inline for (operations, &uops, &ctx_list, 0..) |op, *uop, *ctx, idx| {
ctx.* = .{ .whole = &whole };
var cpy = op;
cpy.userdata = @intFromPtr(ctx);
// coro io operations get merged into one, having .link on last operation is always a mistake
if (idx == operations.len - 1) cpy.link = .unlinked;
uop.* = aio.Operation.uopFromOp(cpy);
}
try frame.scheduler.io.io.queue(operations.len, &uops, frame.scheduler.io.queue_callback);
} else {
var cpy = operations;
cpy.userdata = @intFromPtr(&ctx_list[0]);
cpy.link = .unlinked;
var uops: [1]aio.Dynamic.Uop = .{aio.Operation.uopFromOp(cpy)};
try frame.scheduler.io.io.queue(1, &uops, frame.scheduler.io.queue_callback);
}

// coro io operations get merged into one, having .link on last operation is always a mistake
work.ops[operations.len - 1].link = .unlinked;

try frame.scheduler.io.queue(work.ops);
// wait until scheduler actually submits our work
const ack = frame.scheduler.io_ack;
while (ack == frame.scheduler.io_ack) {
Expand Down

0 comments on commit fec83b4

Please sign in to comment.