diff --git a/src/aio.zig b/src/aio.zig index 92e086d..9c56a74 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -62,7 +62,7 @@ pub const CompletionResult = struct { /// Queue operations dynamically and complete them on demand pub const Dynamic = struct { - pub const Uop = ops.Operation.Union; + pub const Uop = Operation.Union; pub const QueueCallback = *const fn (uop: Uop, id: Id) void; pub const CompletionCallback = *const fn (uop: Uop, id: Id, failed: bool) void; @@ -85,18 +85,13 @@ pub const Dynamic = struct { /// The call is atomic, if any of the operations fail to queue, then the given operations are reverted pub inline fn queue(self: *@This(), operations: anytype) Error!void { const ti = @typeInfo(@TypeOf(operations)); - if (comptime ti == .Struct and ti.Struct.is_tuple) { + if (comptime (ti == .Struct and ti.Struct.is_tuple) or ti == .Array) { if (comptime operations.len == 0) @compileError("no work to be done"); - var uops: [operations.len]ops.Operation.Union = undefined; - inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op); - return self.io.queue(operations.len, &uops, self.queue_callback); - } else if (comptime ti == .Array) { - if (comptime operations.len == 0) @compileError("no work to be done"); - var uops: [operations.len]ops.Operation.Union = undefined; - inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op); + var uops: [operations.len]Operation.Union = undefined; + inline for (operations, &uops) |op, *uop| uop.* = Operation.uopFromOp(op); return self.io.queue(operations.len, &uops, self.queue_callback); } else { - var uops: [1]ops.Operation.Union = .{ops.Operation.uopFromOp(operations)}; + var uops: [1]Operation.Union = .{Operation.uopFromOp(operations)}; return self.io.queue(1, &uops, self.queue_callback); } } @@ -132,15 +127,10 @@ pub const Dynamic = struct { /// Returns the number of errors occured, 0 if there were no errors pub inline fn complete(operations: anytype) Error!u16 { const ti = @typeInfo(@TypeOf(operations)); - if (comptime ti == .Struct and ti.Struct.is_tuple) { - if (comptime operations.len == 0) @compileError("no work to be done"); - var uops: [operations.len]ops.Operation.Union = undefined; - inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op); - return IO.immediate(operations.len, &uops); - } else if (comptime ti == .Array) { + if (comptime (ti == .Struct and ti.Struct.is_tuple) or ti == .Array) { if (comptime operations.len == 0) @compileError("no work to be done"); - var uops: [operations.len]ops.Operation.Union = undefined; - inline for (operations, &uops) |op, *uop| uop.* = ops.Operation.uopFromOp(op); + var uops: [operations.len]Operation.Union = undefined; + inline for (operations, &uops) |op, *uop| uop.* = Operation.uopFromOp(op); return IO.immediate(operations.len, &uops); } else { @compileError("expected a tuple or array of operations"); @@ -203,6 +193,7 @@ const IO = switch (builtin.target.os.tag) { }; const ops = @import("aio/ops.zig"); +pub const Operation = ops.Operation; pub const Id = ops.Id; pub const Nop = ops.Nop; pub const Fsync = ops.Fsync; diff --git a/src/coro/io.zig b/src/coro/io.zig index c8d940e..b3bc9f6 100644 --- a/src/coro/io.zig +++ b/src/coro/io.zig @@ -24,19 +24,30 @@ pub fn do(operations: anytype, status: Frame.Status) Error!u16 { if (Frame.current()) |frame| { if (frame.canceled) return error.Canceled; - var work = struct { ops: @TypeOf(operations) }{ .ops = operations }; var whole: WholeContext = .{ .num_operations = operations.len, .frame = frame }; var ctx_list: [operations.len]OperationContext = undefined; - inline for (&work.ops, &ctx_list) |*op, *ctx| { - ctx.* = .{ .whole = &whole }; - op.userdata = @intFromPtr(ctx); + const ti = @typeInfo(@TypeOf(operations)); + if (comptime (ti == .Struct and ti.Struct.is_tuple) or ti == .Array) { + if (comptime operations.len == 0) @compileError("no work to be done"); + var uops: [operations.len]aio.Dynamic.Uop = undefined; + inline for (operations, &uops, &ctx_list, 0..) |op, *uop, *ctx, idx| { + ctx.* = .{ .whole = &whole }; + var cpy = op; + cpy.userdata = @intFromPtr(ctx); + // coro io operations get merged into one, having .link on last operation is always a mistake + if (idx == operations.len - 1) cpy.link = .unlinked; + uop.* = aio.Operation.uopFromOp(cpy); + } + try frame.scheduler.io.io.queue(operations.len, &uops, frame.scheduler.io.queue_callback); + } else { + var cpy = operations; + cpy.userdata = @intFromPtr(&ctx_list[0]); + cpy.link = .unlinked; + var uops: [1]aio.Dynamic.Uop = .{aio.Operation.uopFromOp(cpy)}; + try frame.scheduler.io.io.queue(1, &uops, frame.scheduler.io.queue_callback); } - // coro io operations get merged into one, having .link on last operation is always a mistake - work.ops[operations.len - 1].link = .unlinked; - - try frame.scheduler.io.queue(work.ops); // wait until scheduler actually submits our work const ack = frame.scheduler.io_ack; while (ack == frame.scheduler.io_ack) {