From 6e9e76723d0f74f4509455279ca7554e353203f3 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Thu, 4 Jul 2024 23:17:55 +0900 Subject: [PATCH] aio: more wip windows work --- build.zig | 9 +- src/aio/posix/windows.zig | 23 ++++- src/aio/windows.zig | 205 +++++++++++++++++++++++++++++++++----- 3 files changed, 208 insertions(+), 29 deletions(-) diff --git a/build.zig b/build.zig index e45eb2c..24bcf86 100644 --- a/build.zig +++ b/build.zig @@ -85,7 +85,14 @@ pub fn build(b: *std.Build) void { .sanitize_thread = sanitize, }); if (mod != .minilib) tst.root_module.addImport("minilib", minilib); - if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule()); + if (mod == .aio) { + tst.root_module.addImport("build_options", opts.createModule()); + if (target.query.os_tag orelse builtin.os.tag == .windows) { + if (b.lazyDependency("zigwin32", .{})) |zigwin32| { + tst.root_module.addImport("win32", zigwin32.module("zigwin32")); + } + } + } if (mod == .coro) tst.root_module.addImport("aio", aio); const run = b.addRunArtifact(tst); test_step.dependOn(&run.step); diff --git a/src/aio/posix/windows.zig b/src/aio/posix/windows.zig index dd19a98..deb2c4a 100644 --- a/src/aio/posix/windows.zig +++ b/src/aio/posix/windows.zig @@ -19,11 +19,24 @@ pub fn unexpectedWSAError(err: win32.networking.win_sock.WSA_ERROR) error{Unexpe return std.os.windows.unexpectedWSAError(@enumFromInt(@intFromEnum(err))); } -pub fn checked(ret: win32.foundation.BOOL) void { - if (ret == 0) { - unexpectedError(GetLastError()) catch {}; - unreachable; - } +pub fn wtry(ret: anytype) !void { + const wbool: win32.foundation.BOOL = if (@TypeOf(ret) == bool) + @intFromBool(ret) + else + ret; + if (wbool == 0) return switch (GetLastError()) { + .ERROR_IO_PENDING => {}, // not error + else => |r| unexpectedError(r), + }; +} + +pub fn werr(ret: anytype) ops.Operation.Error { + wtry(ret) catch |err| return err; + return error.Success; +} + +pub fn checked(ret: anytype) void { + wtry(ret) catch unreachable; } pub const EventSource = struct { diff --git a/src/aio/windows.zig b/src/aio/windows.zig index 71036a2..8813a31 100644 --- a/src/aio/windows.zig +++ b/src/aio/windows.zig @@ -5,20 +5,28 @@ const Operation = @import("ops.zig").Operation; const ItemPool = @import("minilib").ItemPool; const DynamicThreadPool = @import("minilib").DynamicThreadPool; const DoubleBufferedFixedArrayList = @import("minilib").DoubleBufferedFixedArrayList; +const TimerQueue = @import("minilib").TimerQueue; const Fallback = @import("Fallback.zig"); const Uringlator = @import("Uringlator.zig"); const unexpectedError = @import("posix/windows.zig").unexpectedError; +const werr = @import("posix/windows.zig").werr; +const wtry = @import("posix/windows.zig").wtry; const checked = @import("posix/windows.zig").checked; const win32 = @import("win32"); +const Windows = @This(); + // This is a slightly lighter version of the Fallback backend. // Optimized for Windows and uses IOCP operations whenever possible. +// -const GetLastError = win32.foundation.GetLastError; -const INVALID_HANDLE = std.os.windows.INVALID_HANDLE_VALUE; -const HANDLE = win32.foundation.HANDLE; -const CloseHandle = win32.foundation.CloseHandle; -const io = win32.system.io; +pub const IO = switch (aio.options.fallback) { + .auto => Fallback, // Fallback until Windows backend is complete + .force => Fallback, // use only the fallback backend + .disable => Windows, // use only the Windows backend +}; + +pub const EventSource = Uringlator.EventSource; comptime { if (builtin.single_threaded) { @@ -28,46 +36,86 @@ comptime { } } -pub const IO = switch (aio.options.fallback) { - .auto => Fallback, // Fallback until Windows backend is complete - .force => Fallback, // use only the fallback backend - .disable => @This(), // use only the Windows backend -}; +const GetLastError = win32.foundation.GetLastError; +const INVALID_HANDLE = std.os.windows.INVALID_HANDLE_VALUE; +const HANDLE = win32.foundation.HANDLE; +const CloseHandle = win32.foundation.CloseHandle; +const INFINITE = win32.system.windows_programming.INFINITE; +const io = win32.system.io; +const fs = win32.storage.file_system; -pub const EventSource = Uringlator.EventSource; +const IoContext = struct { + overlapped: io.OVERLAPPED = undefined, + // need to dup handles to open them in OVERLAPPED mode + specimen: union(enum) { + handle: HANDLE, + none: void, + } = .none, + // operation specific return value + res: usize = undefined, + + pub fn deinit(self: *@This()) void { + switch (self.specimen) { + .handle => |h| checked(CloseHandle(h)), + .none => {}, + } + self.* = undefined; + } +}; -const Result = struct { failure: Operation.Error, id: u16 }; +// 4 is probably enough for iocp, reserve rest for blocking tasks +const max_iocp_threads = 4; port: HANDLE, // iocp completion port -tpool: DynamicThreadPool, // thread pool for performing non iocp operations +tqueue: TimerQueue, // timer queue implementing linux -like timers +tpool: DynamicThreadPool, // thread pool for performing iocp and non iocp operations +iocp_threads_spawned: bool = false, // iocp needs own poll threads +ovls: []IoContext, uringlator: Uringlator, +num_iocp_threads: u8, pub fn isSupported(_: []const type) bool { return true; // very optimistic :D } pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { - const thread_count = aio.options.max_threads orelse @max(1, std.Thread.getCpuCount() catch 1); - const port = io.CreateIoCompletionPort(INVALID_HANDLE, null, 0, @intCast(thread_count)); - if (port == null) return error.Unexpected; + // need at least 3 threads, 1 iocp thread, 1 timer thread and 1 non-iocp blocking task thread + const thread_count: u32 = @max(3, aio.options.max_threads orelse @as(u32, @intCast(std.Thread.getCpuCount() catch 1))); + const port = io.CreateIoCompletionPort(INVALID_HANDLE, null, 0, @intCast(thread_count)).?; + try wtry(port != INVALID_HANDLE); errdefer checked(CloseHandle(port)); - var tpool = DynamicThreadPool.init(allocator, .{ .max_threads = aio.options.max_threads }) catch |err| return switch (err) { + var tqueue = try TimerQueue.init(allocator); + errdefer tqueue.deinit(); + var tpool = DynamicThreadPool.init(allocator, .{ .max_threads = thread_count }) catch |err| return switch (err) { error.TimerUnsupported => error.SystemOutdated, else => |e| e, }; errdefer tpool.deinit(); + const ovls = try allocator.alloc(IoContext, n); + errdefer allocator.free(ovls); var uringlator = try Uringlator.init(allocator, n); errdefer uringlator.deinit(allocator); return .{ - .port = port.?, + .port = port, + .tqueue = tqueue, .tpool = tpool, + .ovls = ovls, .uringlator = uringlator, + .num_iocp_threads = @min(thread_count - 2, max_iocp_threads), }; } +const SHUTDOWN_KEY = 0xDEADBEEF; + pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - self.tpool.deinit(); + // docs say that GetQueuedCompletionStatus should return if IOCP port is closed + // this doesn't seem to happen under wine though (wine bug?) + // anyhow, wakeup the drain thread by hand + for (0..self.num_iocp_threads) |_| checked(io.PostQueuedCompletionStatus(self.port, 0, SHUTDOWN_KEY, null)); checked(CloseHandle(self.port)); + self.tqueue.deinit(); + self.tpool.deinit(); + allocator.free(self.ovls); self.uringlator.deinit(allocator); self.* = undefined; } @@ -78,8 +126,39 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic. try self.uringlator.queue(len, work, cb, *@This(), self, queueCallback); } +fn iocpDrainThread(self: *@This()) void { + while (true) { + var transferred: u32 = undefined; + var key: usize = undefined; + var maybe_ovl: ?*io.OVERLAPPED = null; + const res = io.GetQueuedCompletionStatus(self.port, &transferred, &key, &maybe_ovl, INFINITE); + if (res == 1) { + if (key == SHUTDOWN_KEY) break; + // work complete + const ctx: *IoContext = @fieldParentPtr("overlapped", maybe_ovl.?); + ctx.res = transferred; + self.uringlator.finish(@intCast(key), error.Success); + } else if (maybe_ovl) |_| { + if (key == SHUTDOWN_KEY) break; + // operation failed + self.uringlator.finish(@intCast(key), werr(0)); + } else { + // most likely port was closed, but check error here in future to make sure + break; + } + } +} + +fn spawnIocpThreads(self: *@This()) !void { + @setCold(true); + // use only one thread, maybe tune this if neccessary in future + for (0..self.num_iocp_threads) |_| self.tpool.spawn(iocpDrainThread, .{self}) catch return error.SystemResources; + self.iocp_threads_spawned = true; +} + pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynamic.CompletionCallback) aio.Error!aio.CompletionResult { if (!try self.uringlator.submit(*@This(), self, start, cancelable)) return .{}; + if (!self.iocp_threads_spawned) try self.spawnIocpThreads(); const num_finished = self.uringlator.finished.len(); if (mode == .blocking and num_finished == 0) { @@ -116,9 +195,73 @@ fn onThreadPosixExecutor(self: *@This(), id: u16, uop: *Operation.Union) void { self.uringlator.finish(id, failure); } +fn onThreadTimeout(ctx: *anyopaque, user_data: usize) void { + var self: *@This() = @ptrCast(@alignCast(ctx)); + self.uringlator.finish(@intCast(user_data), error.Success); +} + +fn ovlOff(offset: u64) io.OVERLAPPED { + return .{ + .Internal = undefined, + .InternalHigh = undefined, + .Anonymous = .{ .Anonymous = @bitCast(offset) }, + .hEvent = undefined, + }; +} + +const AccessInfo = packed struct { + read: bool, + write: bool, + append: bool, +}; + +fn getHandleAccessInfo(handle: HANDLE) !AccessInfo { + var io_status_block: std.os.windows.IO_STATUS_BLOCK = undefined; + var access: std.os.windows.FILE_ACCESS_INFORMATION = undefined; + const rc = std.os.windows.ntdll.NtQueryInformationFile(handle, &io_status_block, &access, @sizeOf(std.os.windows.FILE_ACCESS_INFORMATION), .FileAccessInformation); + switch (rc) { + .SUCCESS => {}, + .INVALID_PARAMETER => unreachable, + else => return error.Unexpected, + } + return .{ + .read = access.AccessFlags & std.os.windows.FILE_READ_DATA != 0, + .write = access.AccessFlags & std.os.windows.FILE_WRITE_DATA != 0, + .append = access.AccessFlags & std.os.windows.FILE_APPEND_DATA != 0, + }; +} + fn start(self: *@This(), id: u16, uop: *Operation.Union) !void { + var trash: u32 = undefined; switch (uop.*) { - // TODO: handle IOCP operations here + .read => |op| { + if (!(try getHandleAccessInfo(op.file.handle)).read) { + self.ovls[id] = .{}; + self.uringlator.finish(id, error.NotOpenForReading); + return; + } + const h = fs.ReOpenFile(op.file.handle, .{ .SYNCHRONIZE = 1, .FILE_READ_DATA = 1 }, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED).?; + checked(h != INVALID_HANDLE); + checked(io.CreateIoCompletionPort(h, self.port, id, 0).? != INVALID_HANDLE); + self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .specimen = .{ .handle = h } }; + try wtry(fs.ReadFile(h, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)); + }, + .write => |op| { + if (!(try getHandleAccessInfo(op.file.handle)).write) { + self.ovls[id] = .{}; + self.uringlator.finish(id, error.NotOpenForWriting); + return; + } + const h = fs.ReOpenFile(op.file.handle, .{ .SYNCHRONIZE = 1, .FILE_WRITE_DATA = 1 }, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED).?; + checked(h != INVALID_HANDLE); + checked(io.CreateIoCompletionPort(h, self.port, id, 0).? != INVALID_HANDLE); + self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .specimen = .{ .handle = h } }; + try wtry(fs.WriteFile(h, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)); + }, + inline .timeout, .link_timeout => |*op| { + const closure: TimerQueue.Closure = .{ .context = self, .callback = onThreadTimeout }; + try self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure }); + }, else => { // perform non IOCP supported operation on a thread self.tpool.spawn(onThreadPosixExecutor, .{ self, id, uop }) catch return error.SystemResources; @@ -126,8 +269,24 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void { } } -fn cancelable(_: *@This(), _: u16, _: *Operation.Union) bool { - return false; +fn cancelable(_: *@This(), _: u16, uop: *Operation.Union) bool { + return switch (uop.*) { + .timeout, .link_timeout => true, + else => false, + }; } -fn completion(_: *@This(), _: u16, _: *Operation.Union) void {} +fn completion(self: *@This(), id: u16, uop: *Operation.Union) void { + switch (uop.*) { + .timeout, .link_timeout => self.tqueue.disarm(.monotonic, id), + .read => |op| { + op.out_read.* = self.ovls[id].res; + self.ovls[id].deinit(); + }, + .write => |op| { + if (op.out_written) |w| w.* = self.ovls[id].res; + self.ovls[id].deinit(); + }, + else => {}, + } +}