From 0928cedb2672bdd441d930ac7e619e4d8ef8f563 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Fri, 19 Jul 2024 16:15:03 +0900 Subject: [PATCH] windows: implement ChildExit --- src/aio.zig | 25 +++++--- src/aio/Windows.zig | 118 +++++++++++++++++++++++--------------- src/aio/posix/posix.zig | 21 ++++++- src/aio/posix/wasi.zig | 18 ------ src/aio/posix/windows.zig | 38 ++++-------- 5 files changed, 117 insertions(+), 103 deletions(-) diff --git a/src/aio.zig b/src/aio.zig index 39cdac5..bb4e960 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -498,15 +498,22 @@ test "SymlinkAt" { } test "ChildExit" { - if (builtin.target.os.tag == .windows or builtin.target.os.tag == .wasi) { - return error.SkipZigTest; - } - - const pid = try std.posix.fork(); - if (pid == 0) { - std.time.sleep(1 * std.time.ns_per_s); - std.posix.exit(69); - } + const pid = switch (builtin.target.os.tag) { + .linux, .freebsd, .openbsd, .dragonfly, .netbsd, .macos, .ios, .watchos, .visionos, .tvos => blk: { + const pid = try std.posix.fork(); + if (pid == 0) { + std.time.sleep(1 * std.time.ns_per_s); + std.posix.exit(69); + } + break :blk pid; + }, + .windows => blk: { + var child = std.process.Child.init(&.{ "cmd.exe", "/c", "exit 69" }, std.heap.page_allocator); + try child.spawn(); + break :blk child.id; + }, + else => return error.SkipZigTest, + }; var term: std.process.Child.Term = undefined; try single(ChildExit{ .child = pid, .out_term = &term }); if (term == .Signal) { diff --git a/src/aio/Windows.zig b/src/aio/Windows.zig index 3f1a415..448cdca 100644 --- a/src/aio/Windows.zig +++ b/src/aio/Windows.zig @@ -45,6 +45,7 @@ const IoContext = struct { // needs to be cleaned up owned: union(enum) { handle: HANDLE, + job: HANDLE, none: void, } = .none, @@ -53,7 +54,7 @@ const IoContext = struct { pub fn deinit(self: *@This()) void { switch (self.owned) { - .handle => |h| checked(CloseHandle(h)), + inline .handle, .job => |h| checked(CloseHandle(h)), .none => {}, } self.* = undefined; @@ -123,31 +124,37 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic. fn iocpDrainThread(self: *@This()) void { while (true) { var transferred: u32 = undefined; - var key: usize = undefined; + var key: Iocp.Key = undefined; var maybe_ovl: ?*io.OVERLAPPED = null; - const res = io.GetQueuedCompletionStatus(self.iocp.port, &transferred, &key, &maybe_ovl, INFINITE); + const res = io.GetQueuedCompletionStatus(self.iocp.port, &transferred, @ptrCast(&key), &maybe_ovl, INFINITE); if (res == 1) { - if (key == Iocp.Notification.Key) { - const note: Iocp.Notification = @bitCast(transferred); - switch (note.type) { - .shutdown => break, - .event_source => { - const source: *EventSource = @ptrCast(@alignCast(maybe_ovl.?)); - source.wait(); - self.uringlator.finish(note.id, error.Success); - }, - } - } else { - const ctx: *IoContext = @fieldParentPtr("overlapped", maybe_ovl.?); - ctx.res = transferred; - const id: u16 = @intCast((@intFromPtr(ctx) - @intFromPtr(self.ovls.ptr)) / @sizeOf(IoContext)); - self.uringlator.finish(id, error.Success); + switch (key.type) { + .shutdown => break, + .event_source => { + const source: *EventSource = @ptrCast(@alignCast(maybe_ovl.?)); + source.wait(); + }, + .child_exit => { + switch (transferred) { + win32.system.system_services.JOB_OBJECT_MSG_EXIT_PROCESS, win32.system.system_services.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS => {}, + else => continue, + } + const op = &self.uringlator.ops.nodes[key.id].used.child_exit; + if (op.out_term) |term| { + var code: u32 = undefined; + if (win32.system.threading.GetExitCodeProcess(op.child, &code) == 0) { + term.* = .{ .Unknown = 0 }; + } else { + term.* = .{ .Exited = @truncate(code) }; + } + } + }, + .overlapped => self.ovls[key.id].res = transferred, } + self.uringlator.finish(key.id, error.Success); } else if (maybe_ovl) |_| { - std.debug.assert(key != Iocp.Notification.Key); - const ctx: *IoContext = @fieldParentPtr("overlapped", maybe_ovl.?); - const id: u16 = @intCast((@intFromPtr(ctx) - @intFromPtr(self.ovls.ptr)) / @sizeOf(IoContext)); - self.uringlator.finish(id, werr(0)); + std.debug.assert(key.type != .shutdown); + self.uringlator.finish(key.id, werr(0)); } else { break; } @@ -239,52 +246,65 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void { switch (uop.*) { .read => |*op| { const flags = try getHandleAccessInfo(op.file.handle); - if (flags.FILE_READ_DATA != 1) { - self.uringlator.finish(id, error.NotOpenForReading); - return; - } - const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED).?; - wtry(h != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err); - self.iocp.associateHandle(h) catch |err| return self.uringlator.finish(id, err); - self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h } }; - wtry(fs.ReadFile(h, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err); + if (flags.FILE_READ_DATA != 1) return self.uringlator.finish(id, error.NotOpenForReading); + const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED); + wtry(h != null and h.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateHandle(id, h.?) catch |err| return self.uringlator.finish(id, err); + self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h.? } }; + wtry(fs.ReadFile(h.?, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err); }, .write => |*op| { const flags = try getHandleAccessInfo(op.file.handle); - if (flags.FILE_WRITE_DATA != 1) { - self.uringlator.finish(id, error.NotOpenForWriting); - return; - } - const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED).?; - wtry(h != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err); - self.iocp.associateHandle(h) catch |err| return self.uringlator.finish(id, err); - self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h } }; - wtry(fs.WriteFile(h, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err); + if (flags.FILE_WRITE_DATA != 1) return self.uringlator.finish(id, error.NotOpenForWriting); + const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED); + wtry(h != null and h.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateHandle(id, h.?) catch |err| return self.uringlator.finish(id, err); + self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h.? } }; + wtry(fs.WriteFile(h.?, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err); }, .accept => |*op| { - self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err); op.out_socket.* = aio.socket(std.posix.AF.INET, 0, 0) catch |err| return self.uringlator.finish(id, err); wtry(win_sock.AcceptEx(op.socket, op.out_socket.*, &op._, 0, @sizeOf(std.posix.sockaddr) + 16, @sizeOf(std.posix.sockaddr) + 16, &trash, &self.ovls[id].overlapped) == 1) catch |err| return self.uringlator.finish(id, err); }, .recv => |*op| { - self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err); _ = wposix.recvEx(op.socket, &op._, 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err); }, .send => |*op| { - self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err); _ = wposix.sendEx(op.socket, &op._, 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err); }, .send_msg => |*op| { - self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err); _ = wposix.sendmsgEx(op.socket, @constCast(op.msg), 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err); }, .recv_msg => |*op| { - self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err); + self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err); _ = wposix.recvmsgEx(op.socket, op.out_msg, 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err); }, inline .timeout, .link_timeout => |*op| { const closure: TimerQueue.Closure = .{ .context = self, .callback = onThreadTimeout }; - self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure }) catch self.uringlator.finish(id, error.Unexpected); + self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure }) catch return self.uringlator.finish(id, error.Unexpected); + }, + .child_exit => |*op| { + const job = win32.system.job_objects.CreateJobObjectW(null, null); + wtry(job != null and job.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err); + errdefer checked(CloseHandle(job.?)); + wtry(win32.system.job_objects.AssignProcessToJobObject(job.?, op.child)) catch return self.uringlator.finish(id, error.Unexpected); + const key: Iocp.Key = .{ .type = .child_exit, .id = id }; + var assoc: win32.system.job_objects.JOBOBJECT_ASSOCIATE_COMPLETION_PORT = .{ + .CompletionKey = @ptrFromInt(@as(usize, @bitCast(key))), + .CompletionPort = self.iocp.port, + }; + self.ovls[id] = .{ .owned = .{ .job = job.? } }; + errdefer self.ovls[id] = .{}; + wtry(win32.system.job_objects.SetInformationJobObject( + job.?, + win32.system.job_objects.JobObjectAssociateCompletionPortInformation, + @ptrCast(&assoc), + @sizeOf(@TypeOf(assoc)), + )) catch return self.uringlator.finish(id, error.Unexpected); }, .wait_event_source => |*op| op.source.native.addWaiter(&op._.link), // can be performed without a thread @@ -304,6 +324,12 @@ fn cancel(self: *@This(), id: u16, uop: *Operation.Union) bool { inline .accept, .recv, .send, .send_msg, .recv_msg => |*op| { return io.CancelIoEx(@ptrCast(op.socket), &self.ovls[id].overlapped) != 0; }, + .child_exit => { + self.ovls[id].deinit(); + self.ovls[id] = .{}; + self.uringlator.finish(id, error.Canceled); + return true; + }, .timeout, .link_timeout, .wait_event_source => { self.tqueue.disarm(.monotonic, id); self.uringlator.finish(id, error.Canceled); diff --git a/src/aio/posix/posix.zig b/src/aio/posix/posix.zig index 520ce10..fc992d6 100644 --- a/src/aio/posix/posix.zig +++ b/src/aio/posix/posix.zig @@ -57,13 +57,28 @@ pub const EventSource = switch (builtin.target.os.tag) { else => PipeEventSource, }; +const DummyChildWatcher = struct { + id: std.process.Child.Id, + fd: std.posix.fd_t, + + pub fn init(_: std.process.Child.Id) !@This() { + @panic("platform does not support posix ChildWatcher"); + } + + pub fn wait(_: *@This()) std.process.Child.Term { + @panic("platform does not support posix ChildWatcher"); + } + + pub fn deinit(_: *@This()) void { + @panic("platform does not support posix ChildWatcher"); + } +}; + pub const ChildWatcher = switch (builtin.target.os.tag) { .linux => linux.ChildWatcher, - .windows => windows.ChildWatcher, .freebsd, .openbsd, .dragonfly, .netbsd => bsd.ChildWatcher, .macos, .ios, .watchos, .visionos, .tvos => darwin.ChildWatcher, - .wasi => wasi.ChildWatcher, - else => @compileError("unsupported"), + else => DummyChildWatcher, }; pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O { diff --git a/src/aio/posix/wasi.zig b/src/aio/posix/wasi.zig index 3689ab7..d0d7750 100644 --- a/src/aio/posix/wasi.zig +++ b/src/aio/posix/wasi.zig @@ -8,24 +8,6 @@ const log = std.log.scoped(.aio_wasi); // so not implementing that until something there changes pub const EventSource = posix.PipeEventSource; -pub const ChildWatcher = struct { - id: std.process.Child.Id, - fd: std.posix.fd_t, - - pub fn init(id: std.process.Child.Id) !@This() { - if (true) @panic("unavailable on the wasi platform"); - return .{ .id = id, .fd = 0 }; - } - - pub fn wait(_: *@This()) std.process.Child.Term { - if (true) @panic("unavailable on the wasi platform"); - } - - pub fn deinit(self: *@This()) void { - self.* = undefined; - } -}; - pub const BIGGEST_ALIGNMENT = 16; pub const sa_family_t = u16; diff --git a/src/aio/posix/windows.zig b/src/aio/posix/windows.zig index 9286a92..9bc452d 100644 --- a/src/aio/posix/windows.zig +++ b/src/aio/posix/windows.zig @@ -45,13 +45,15 @@ pub fn checked(ret: anytype) void { // Light wrapper, mainly to link EventSources to this pub const Iocp = struct { - pub const Notification = packed struct(u32) { - pub const Key = std.math.maxInt(usize); + pub const Key = packed struct(usize) { type: enum(u16) { shutdown, event_source, + child_exit, + overlapped, }, id: u16, + _: @Type(.{ .Int = .{ .bits = @bitSizeOf(usize) - @bitSizeOf(u16) * 2, .signedness = .unsigned } }) = undefined, }; port: HANDLE, @@ -64,9 +66,9 @@ pub const Iocp = struct { return .{ .port = port, .num_threads = num_threads }; } - pub fn notify(self: *@This(), data: Notification, ptr: ?*anyopaque) void { + pub fn notify(self: *@This(), key: Key, ptr: ?*anyopaque) void { // data for notification is put into the transferred bytes, overlapped can be anything - checked(io.PostQueuedCompletionStatus(self.port, @bitCast(data), Notification.Key, @ptrCast(@alignCast(ptr)))); + checked(io.PostQueuedCompletionStatus(self.port, 0, @bitCast(key), @ptrCast(@alignCast(ptr)))); } pub fn deinit(self: *@This()) void { @@ -78,9 +80,9 @@ pub const Iocp = struct { self.* = undefined; } - pub fn associateHandle(self: *@This(), handle: HANDLE) !void { - // always assiocate the handle as the key, because the key should have the life time of a handle - const res = io.CreateIoCompletionPort(handle, self.port, @intFromPtr(handle), 0); + pub fn associateHandle(self: *@This(), id: u16, handle: HANDLE) !void { + const key: Key = .{ .type = .overlapped, .id = id }; + const res = io.CreateIoCompletionPort(handle, self.port, @bitCast(key), 0); if (res == null or res.? == INVALID_HANDLE) { // ignore 87 as it may mean that we just re-registered the handle if (GetLastError() == .ERROR_INVALID_PARAMETER) return; @@ -88,8 +90,8 @@ pub const Iocp = struct { } } - pub fn associateSocket(self: *@This(), sock: std.posix.socket_t) !void { - return self.associateHandle(@ptrCast(sock)); + pub fn associateSocket(self: *@This(), id: u16, sock: std.posix.socket_t) !void { + return self.associateHandle(id, @ptrCast(sock)); } }; @@ -158,24 +160,6 @@ pub const EventSource = struct { } }; -pub const ChildWatcher = struct { - id: std.process.Child.Id, - fd: std.posix.fd_t, - - pub fn init(id: std.process.Child.Id) !@This() { - if (true) @panic("fixme"); - return .{ .id = id, .fd = 0 }; - } - - pub fn wait(_: *@This()) std.process.Child.Term { - if (true) @panic("fixme"); - } - - pub fn deinit(self: *@This()) void { - self.* = undefined; - } -}; - pub fn translateTty(_: std.posix.fd_t, _: []u8, _: *ops.ReadTty.TranslationState) ops.ReadTty.Error!usize { if (true) @panic("TODO"); return 0;