Skip to content

Commit

Permalink
windows: implement ChildExit
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jul 19, 2024
1 parent dffba0d commit 0928ced
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 103 deletions.
25 changes: 16 additions & 9 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,22 @@ test "SymlinkAt" {
}

test "ChildExit" {
if (builtin.target.os.tag == .windows or builtin.target.os.tag == .wasi) {
return error.SkipZigTest;
}

const pid = try std.posix.fork();
if (pid == 0) {
std.time.sleep(1 * std.time.ns_per_s);
std.posix.exit(69);
}
const pid = switch (builtin.target.os.tag) {
.linux, .freebsd, .openbsd, .dragonfly, .netbsd, .macos, .ios, .watchos, .visionos, .tvos => blk: {
const pid = try std.posix.fork();
if (pid == 0) {
std.time.sleep(1 * std.time.ns_per_s);
std.posix.exit(69);
}
break :blk pid;
},
.windows => blk: {
var child = std.process.Child.init(&.{ "cmd.exe", "/c", "exit 69" }, std.heap.page_allocator);
try child.spawn();
break :blk child.id;
},
else => return error.SkipZigTest,
};
var term: std.process.Child.Term = undefined;
try single(ChildExit{ .child = pid, .out_term = &term });
if (term == .Signal) {
Expand Down
118 changes: 72 additions & 46 deletions src/aio/Windows.zig
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const IoContext = struct {
// needs to be cleaned up
owned: union(enum) {
handle: HANDLE,
job: HANDLE,
none: void,
} = .none,

Expand All @@ -53,7 +54,7 @@ const IoContext = struct {

pub fn deinit(self: *@This()) void {
switch (self.owned) {
.handle => |h| checked(CloseHandle(h)),
inline .handle, .job => |h| checked(CloseHandle(h)),
.none => {},
}
self.* = undefined;
Expand Down Expand Up @@ -123,31 +124,37 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic.
fn iocpDrainThread(self: *@This()) void {
while (true) {
var transferred: u32 = undefined;
var key: usize = undefined;
var key: Iocp.Key = undefined;
var maybe_ovl: ?*io.OVERLAPPED = null;
const res = io.GetQueuedCompletionStatus(self.iocp.port, &transferred, &key, &maybe_ovl, INFINITE);
const res = io.GetQueuedCompletionStatus(self.iocp.port, &transferred, @ptrCast(&key), &maybe_ovl, INFINITE);
if (res == 1) {
if (key == Iocp.Notification.Key) {
const note: Iocp.Notification = @bitCast(transferred);
switch (note.type) {
.shutdown => break,
.event_source => {
const source: *EventSource = @ptrCast(@alignCast(maybe_ovl.?));
source.wait();
self.uringlator.finish(note.id, error.Success);
},
}
} else {
const ctx: *IoContext = @fieldParentPtr("overlapped", maybe_ovl.?);
ctx.res = transferred;
const id: u16 = @intCast((@intFromPtr(ctx) - @intFromPtr(self.ovls.ptr)) / @sizeOf(IoContext));
self.uringlator.finish(id, error.Success);
switch (key.type) {
.shutdown => break,
.event_source => {
const source: *EventSource = @ptrCast(@alignCast(maybe_ovl.?));
source.wait();
},
.child_exit => {
switch (transferred) {
win32.system.system_services.JOB_OBJECT_MSG_EXIT_PROCESS, win32.system.system_services.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS => {},
else => continue,
}
const op = &self.uringlator.ops.nodes[key.id].used.child_exit;
if (op.out_term) |term| {
var code: u32 = undefined;
if (win32.system.threading.GetExitCodeProcess(op.child, &code) == 0) {
term.* = .{ .Unknown = 0 };
} else {
term.* = .{ .Exited = @truncate(code) };
}
}
},
.overlapped => self.ovls[key.id].res = transferred,
}
self.uringlator.finish(key.id, error.Success);
} else if (maybe_ovl) |_| {
std.debug.assert(key != Iocp.Notification.Key);
const ctx: *IoContext = @fieldParentPtr("overlapped", maybe_ovl.?);
const id: u16 = @intCast((@intFromPtr(ctx) - @intFromPtr(self.ovls.ptr)) / @sizeOf(IoContext));
self.uringlator.finish(id, werr(0));
std.debug.assert(key.type != .shutdown);
self.uringlator.finish(key.id, werr(0));
} else {
break;
}
Expand Down Expand Up @@ -239,52 +246,65 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void {
switch (uop.*) {
.read => |*op| {
const flags = try getHandleAccessInfo(op.file.handle);
if (flags.FILE_READ_DATA != 1) {
self.uringlator.finish(id, error.NotOpenForReading);
return;
}
const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED).?;
wtry(h != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateHandle(h) catch |err| return self.uringlator.finish(id, err);
self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h } };
wtry(fs.ReadFile(h, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err);
if (flags.FILE_READ_DATA != 1) return self.uringlator.finish(id, error.NotOpenForReading);
const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED);
wtry(h != null and h.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateHandle(id, h.?) catch |err| return self.uringlator.finish(id, err);
self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h.? } };
wtry(fs.ReadFile(h.?, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err);
},
.write => |*op| {
const flags = try getHandleAccessInfo(op.file.handle);
if (flags.FILE_WRITE_DATA != 1) {
self.uringlator.finish(id, error.NotOpenForWriting);
return;
}
const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED).?;
wtry(h != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateHandle(h) catch |err| return self.uringlator.finish(id, err);
self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h } };
wtry(fs.WriteFile(h, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err);
if (flags.FILE_WRITE_DATA != 1) return self.uringlator.finish(id, error.NotOpenForWriting);
const h = fs.ReOpenFile(op.file.handle, flags, .{ .READ = 1, .WRITE = 1 }, fs.FILE_FLAG_OVERLAPPED);
wtry(h != null and h.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateHandle(id, h.?) catch |err| return self.uringlator.finish(id, err);
self.ovls[id] = .{ .overlapped = ovlOff(op.offset), .owned = .{ .handle = h.? } };
wtry(fs.WriteFile(h.?, op.buffer.ptr, @intCast(op.buffer.len), &trash, &self.ovls[id].overlapped)) catch |err| return self.uringlator.finish(id, err);
},
.accept => |*op| {
self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err);
op.out_socket.* = aio.socket(std.posix.AF.INET, 0, 0) catch |err| return self.uringlator.finish(id, err);
wtry(win_sock.AcceptEx(op.socket, op.out_socket.*, &op._, 0, @sizeOf(std.posix.sockaddr) + 16, @sizeOf(std.posix.sockaddr) + 16, &trash, &self.ovls[id].overlapped) == 1) catch |err| return self.uringlator.finish(id, err);
},
.recv => |*op| {
self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err);
_ = wposix.recvEx(op.socket, &op._, 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err);
},
.send => |*op| {
self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err);
_ = wposix.sendEx(op.socket, &op._, 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err);
},
.send_msg => |*op| {
self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err);
_ = wposix.sendmsgEx(op.socket, @constCast(op.msg), 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err);
},
.recv_msg => |*op| {
self.iocp.associateSocket(op.socket) catch |err| return self.uringlator.finish(id, err);
self.iocp.associateSocket(id, op.socket) catch |err| return self.uringlator.finish(id, err);
_ = wposix.recvmsgEx(op.socket, op.out_msg, 0, &self.ovls[id].overlapped) catch |err| return self.uringlator.finish(id, err);
},
inline .timeout, .link_timeout => |*op| {
const closure: TimerQueue.Closure = .{ .context = self, .callback = onThreadTimeout };
self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure }) catch self.uringlator.finish(id, error.Unexpected);
self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure }) catch return self.uringlator.finish(id, error.Unexpected);
},
.child_exit => |*op| {
const job = win32.system.job_objects.CreateJobObjectW(null, null);
wtry(job != null and job.? != INVALID_HANDLE) catch |err| return self.uringlator.finish(id, err);
errdefer checked(CloseHandle(job.?));
wtry(win32.system.job_objects.AssignProcessToJobObject(job.?, op.child)) catch return self.uringlator.finish(id, error.Unexpected);
const key: Iocp.Key = .{ .type = .child_exit, .id = id };
var assoc: win32.system.job_objects.JOBOBJECT_ASSOCIATE_COMPLETION_PORT = .{
.CompletionKey = @ptrFromInt(@as(usize, @bitCast(key))),
.CompletionPort = self.iocp.port,
};
self.ovls[id] = .{ .owned = .{ .job = job.? } };
errdefer self.ovls[id] = .{};
wtry(win32.system.job_objects.SetInformationJobObject(
job.?,
win32.system.job_objects.JobObjectAssociateCompletionPortInformation,
@ptrCast(&assoc),
@sizeOf(@TypeOf(assoc)),
)) catch return self.uringlator.finish(id, error.Unexpected);
},
.wait_event_source => |*op| op.source.native.addWaiter(&op._.link),
// can be performed without a thread
Expand All @@ -304,6 +324,12 @@ fn cancel(self: *@This(), id: u16, uop: *Operation.Union) bool {
inline .accept, .recv, .send, .send_msg, .recv_msg => |*op| {
return io.CancelIoEx(@ptrCast(op.socket), &self.ovls[id].overlapped) != 0;
},
.child_exit => {
self.ovls[id].deinit();
self.ovls[id] = .{};
self.uringlator.finish(id, error.Canceled);
return true;
},
.timeout, .link_timeout, .wait_event_source => {
self.tqueue.disarm(.monotonic, id);
self.uringlator.finish(id, error.Canceled);
Expand Down
21 changes: 18 additions & 3 deletions src/aio/posix/posix.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,28 @@ pub const EventSource = switch (builtin.target.os.tag) {
else => PipeEventSource,
};

const DummyChildWatcher = struct {
id: std.process.Child.Id,
fd: std.posix.fd_t,

pub fn init(_: std.process.Child.Id) !@This() {
@panic("platform does not support posix ChildWatcher");
}

pub fn wait(_: *@This()) std.process.Child.Term {
@panic("platform does not support posix ChildWatcher");
}

pub fn deinit(_: *@This()) void {
@panic("platform does not support posix ChildWatcher");
}
};

pub const ChildWatcher = switch (builtin.target.os.tag) {
.linux => linux.ChildWatcher,
.windows => windows.ChildWatcher,
.freebsd, .openbsd, .dragonfly, .netbsd => bsd.ChildWatcher,
.macos, .ios, .watchos, .visionos, .tvos => darwin.ChildWatcher,
.wasi => wasi.ChildWatcher,
else => @compileError("unsupported"),
else => DummyChildWatcher,
};

pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O {
Expand Down
18 changes: 0 additions & 18 deletions src/aio/posix/wasi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,6 @@ const log = std.log.scoped(.aio_wasi);
// so not implementing that until something there changes
pub const EventSource = posix.PipeEventSource;

pub const ChildWatcher = struct {
id: std.process.Child.Id,
fd: std.posix.fd_t,

pub fn init(id: std.process.Child.Id) !@This() {
if (true) @panic("unavailable on the wasi platform");
return .{ .id = id, .fd = 0 };
}

pub fn wait(_: *@This()) std.process.Child.Term {
if (true) @panic("unavailable on the wasi platform");
}

pub fn deinit(self: *@This()) void {
self.* = undefined;
}
};

pub const BIGGEST_ALIGNMENT = 16;

pub const sa_family_t = u16;
Expand Down
38 changes: 11 additions & 27 deletions src/aio/posix/windows.zig
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ pub fn checked(ret: anytype) void {

// Light wrapper, mainly to link EventSources to this
pub const Iocp = struct {
pub const Notification = packed struct(u32) {
pub const Key = std.math.maxInt(usize);
pub const Key = packed struct(usize) {
type: enum(u16) {
shutdown,
event_source,
child_exit,
overlapped,
},
id: u16,
_: @Type(.{ .Int = .{ .bits = @bitSizeOf(usize) - @bitSizeOf(u16) * 2, .signedness = .unsigned } }) = undefined,
};

port: HANDLE,
Expand All @@ -64,9 +66,9 @@ pub const Iocp = struct {
return .{ .port = port, .num_threads = num_threads };
}

pub fn notify(self: *@This(), data: Notification, ptr: ?*anyopaque) void {
pub fn notify(self: *@This(), key: Key, ptr: ?*anyopaque) void {
// data for notification is put into the transferred bytes, overlapped can be anything
checked(io.PostQueuedCompletionStatus(self.port, @bitCast(data), Notification.Key, @ptrCast(@alignCast(ptr))));
checked(io.PostQueuedCompletionStatus(self.port, 0, @bitCast(key), @ptrCast(@alignCast(ptr))));
}

pub fn deinit(self: *@This()) void {
Expand All @@ -78,18 +80,18 @@ pub const Iocp = struct {
self.* = undefined;
}

pub fn associateHandle(self: *@This(), handle: HANDLE) !void {
// always assiocate the handle as the key, because the key should have the life time of a handle
const res = io.CreateIoCompletionPort(handle, self.port, @intFromPtr(handle), 0);
pub fn associateHandle(self: *@This(), id: u16, handle: HANDLE) !void {
const key: Key = .{ .type = .overlapped, .id = id };
const res = io.CreateIoCompletionPort(handle, self.port, @bitCast(key), 0);
if (res == null or res.? == INVALID_HANDLE) {
// ignore 87 as it may mean that we just re-registered the handle
if (GetLastError() == .ERROR_INVALID_PARAMETER) return;
try wtry(0);
}
}

pub fn associateSocket(self: *@This(), sock: std.posix.socket_t) !void {
return self.associateHandle(@ptrCast(sock));
pub fn associateSocket(self: *@This(), id: u16, sock: std.posix.socket_t) !void {
return self.associateHandle(id, @ptrCast(sock));
}
};

Expand Down Expand Up @@ -158,24 +160,6 @@ pub const EventSource = struct {
}
};

pub const ChildWatcher = struct {
id: std.process.Child.Id,
fd: std.posix.fd_t,

pub fn init(id: std.process.Child.Id) !@This() {
if (true) @panic("fixme");
return .{ .id = id, .fd = 0 };
}

pub fn wait(_: *@This()) std.process.Child.Term {
if (true) @panic("fixme");
}

pub fn deinit(self: *@This()) void {
self.* = undefined;
}
};

pub fn translateTty(_: std.posix.fd_t, _: []u8, _: *ops.ReadTty.TranslationState) ops.ReadTty.Error!usize {
if (true) @panic("TODO");
return 0;
Expand Down

0 comments on commit 0928ced

Please sign in to comment.