Skip to content

Commit

Permalink
aio: replace timers with TimerQueue
Browse files Browse the repository at this point in the history
This TimerQueue emulates linux timerfd on most of the platforms.
Using OS native timers becomes hairy when you want to have similar
behaviour, features and precision as on linux. On linux timerfd is used,
and on io_uring of course io_uring's own timeout is used instead.
  • Loading branch information
Cloudef committed Jul 4, 2024
1 parent 32a33c7 commit d80b6e2
Show file tree
Hide file tree
Showing 11 changed files with 533 additions and 173 deletions.
4 changes: 4 additions & 0 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ pub const Error = error{
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
SystemResources,
UserResourceLimitReached,
ThreadQuotaExceeded,
LockedMemoryLimitExceeded,
SystemOutdated,
Unsupported,
Unexpected,
};

Expand Down
31 changes: 24 additions & 7 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const Operation = @import("ops.zig").Operation;
const ItemPool = @import("minilib").ItemPool;
const FixedArrayList = @import("minilib").FixedArrayList;
const DoubleBufferedFixedArrayList = @import("minilib").DoubleBufferedFixedArrayList;
const TimerQueue = @import("minilib").TimerQueue;
const DynamicThreadPool = @import("minilib").DynamicThreadPool;
const Uringlator = @import("Uringlator.zig");
const log = std.log.scoped(.aio_fallback);
Expand All @@ -28,8 +29,7 @@ comptime {

pub const EventSource = posix.EventSource;

const Result = struct { failure: Operation.Error, id: u16 };

tqueue: TimerQueue, // timer queue implementing linux -like timers
readiness: []posix.Readiness, // readiness fd that gets polled before we perform the operation
pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup
tpool: DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here
Expand All @@ -42,6 +42,8 @@ pub fn isSupported(_: []const type) bool {
}

pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
var tqueue = try TimerQueue.init(allocator);
errdefer tqueue.deinit();
const readiness = try allocator.alloc(posix.Readiness, n);
errdefer allocator.free(readiness);
@memset(readiness, .{});
Expand All @@ -63,6 +65,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
errdefer uringlator.deinit(allocator);
pfd.add(.{ .fd = uringlator.source.fd, .events = std.posix.POLL.IN, .revents = 0 }) catch unreachable;
return .{
.tqueue = tqueue,
.readiness = readiness,
.pfd = pfd,
.tpool = tpool,
Expand All @@ -73,6 +76,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
}

pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
self.tqueue.deinit();
self.tpool.deinit();
self.kludge_tpool.deinit();
var iter = self.pending.iterator(.{});
Expand Down Expand Up @@ -167,11 +171,18 @@ fn onThreadExecutor(self: *@This(), id: u16, uop: *Operation.Union, readiness: p
self.uringlator.finish(id, failure);
}

fn onThreadTimeout(ctx: *anyopaque, user_data: usize) void {
var self: *@This() = @ptrCast(@alignCast(ctx));
self.uringlator.finish(@intCast(user_data), error.Success);
}

fn start(self: *@This(), id: u16, uop: *Operation.Union) !void {
if (self.readiness[id].mode == .nopoll or self.readiness[id].mode == .kludge or self.pending.isSet(id)) {
switch (uop.*) {
.timeout => self.uringlator.finish(id, error.Success),
.link_timeout => self.uringlator.finishLinkTimeout(id),
inline .timeout, .link_timeout => |*op| {
const closure: TimerQueue.Closure = .{ .context = self, .callback = onThreadTimeout };
try self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure });
},
// can be performed here, doesn't have to be dispatched to thread
inline .child_exit, .notify_event_source, .wait_event_source, .close_event_source => |*op| {
var failure: Operation.Error = error.Success;
Expand All @@ -197,7 +208,6 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void {
Uringlator.debug("pending: {}: {}", .{ id, std.meta.activeTag(uop.*) });
}
std.debug.assert(self.readiness[id].fd != posix.invalid_fd);
try Uringlator.uopUnwrapCall(uop, posix.armReadiness, .{self.readiness[id]});
self.pfd.add(.{
.fd = self.readiness[id].fd,
.events = switch (self.readiness[id].mode) {
Expand All @@ -211,11 +221,18 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void {
}
}

fn cancelable(self: *@This(), id: u16, _: *Operation.Union) bool {
return self.pending.isSet(id);
fn cancelable(self: *@This(), id: u16, uop: *Operation.Union) bool {
return self.pending.isSet(id) or switch (uop.*) {
.timeout, .link_timeout => true,
else => false,
};
}

fn completion(self: *@This(), id: u16, uop: *Operation.Union) void {
switch (uop.*) {
.timeout, .link_timeout => self.tqueue.disarm(.monotonic, id),
else => {},
}
if (self.readiness[id].fd != posix.invalid_fd) {
for (self.pfd.items[0..self.pfd.len], 0..) |pfd, idx| {
if (pfd.fd == self.readiness[id].fd) {
Expand Down
53 changes: 27 additions & 26 deletions src/aio/Uringlator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,6 @@ pub fn submit(
return true;
}

pub fn finishLinkTimeout(self: *@This(), id: u16) void {
var iter = self.ops.iterator();
const res: enum { ok, not_found } = blk: {
while (iter.next()) |e| {
if (e.k != id and self.next[e.k] == id) {
self.finish(e.k, error.Canceled);
self.next[e.k] = e.k;
break :blk .ok;
}
}
break :blk .not_found;
};
if (res == .ok) {
self.finish(id, error.Expired);
} else {
self.finish(id, error.Success);
}
}

fn start(
self: *@This(),
id: u16,
Expand Down Expand Up @@ -207,22 +188,42 @@ pub fn complete(
const finished = self.finished.swap();
var num_errors: u16 = 0;
for (finished) |res| {
if (res.failure != error.Success) {
debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used), res.failure });
var failure = res.failure;
if (self.ops.nodes[res.id].used == .link_timeout and failure != error.Canceled) {
var iter = self.ops.iterator();
const cres: enum { ok, not_found } = blk: {
while (iter.next()) |e| {
if (e.k != res.id and self.next[e.k] == res.id) {
self.finish(e.k, error.Canceled);
self.next[e.k] = e.k;
break :blk .ok;
}
}
break :blk .not_found;
};
if (cres == .ok) {
failure = error.Expired;
} else {
failure = error.Success;
}
}

if (failure != error.Success) {
debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used), failure });
} else {
debug("complete: {}: {} [OK]", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used) });
}

if (self.ops.nodes[res.id].used == .link_timeout and res.failure == error.Canceled) {
if (self.ops.nodes[res.id].used == .link_timeout and failure == error.Canceled) {
// special case
} else {
num_errors += @intFromBool(res.failure != error.Success);
num_errors += @intFromBool(failure != error.Success);
}

uopUnwrapCall(&self.ops.nodes[res.id].used, completition, .{ self, res });
uopUnwrapCall(&self.ops.nodes[res.id].used, completition, .{ self, .{ .id = res.id, .failure = failure } });

var uop = self.ops.nodes[res.id].used;
if (cb) |f| f(uop, @enumFromInt(res.id), res.failure != error.Success);
if (cb) |f| f(uop, @enumFromInt(res.id), failure != error.Success);
completion_cb(ctx, res.id, &uop);
self.removeOp(res.id);
}
Expand Down Expand Up @@ -261,7 +262,7 @@ pub fn uopUnwrapCall(uop: *Operation.Union, comptime func: anytype, args: anytyp

pub fn debug(comptime fmt: []const u8, args: anytype) void {
if (@import("builtin").is_test) {
std.debug.print("fallback: " ++ fmt ++ "\n", args);
std.debug.print("uringlator: " ++ fmt ++ "\n", args);
} else {
if (comptime !aio.options.debug) return;
log.debug(fmt, args);
Expand Down
24 changes: 1 addition & 23 deletions src/aio/posix.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ pub const ChildWatcher = switch (builtin.target.os.tag) {
else => @compileError("unsupported"),
};

pub const Timer = switch (builtin.target.os.tag) {
.linux => linux.Timer,
.windows => windows.Timer,
.freebsd, .openbsd, .dragonfly, .netbsd => bsd.Timer,
.macos, .ios, .watchos, .visionos, .tvos => darwin.Timer,
else => @compileError("unsupported"),
};

pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O {
var os_flags: std.posix.O = .{
.ACCMODE = switch (flags.mode) {
Expand Down Expand Up @@ -252,7 +244,7 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness {
else => .{ .fd = op.socket, .mode = .out },
},
.open_at, .close_file, .close_dir, .close_socket => .{},
.timeout, .link_timeout => .{ .fd = (try Timer.init(.monotonic)).fd, .mode = .in },
.timeout, .link_timeout => .{},
.cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => .{},
.child_exit => .{ .fd = (try ChildWatcher.init(op.child)).fd, .mode = .in },
.wait_event_source => op.source.native.waitReadiness(),
Expand All @@ -266,23 +258,9 @@ pub const ArmReadinessError = error{
Unexpected,
};

pub inline fn armReadiness(op: anytype, readiness: Readiness) ArmReadinessError!void {
switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) {
.timeout, .link_timeout => {
var timer: Timer = .{ .fd = readiness.fd, .clock = .monotonic };
try timer.set(op.ns);
},
else => {},
}
}

pub inline fn closeReadiness(op: anytype, readiness: Readiness) void {
if (readiness.fd == invalid_fd) return;
switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) {
.timeout, .link_timeout => {
var timer: Timer = .{ .fd = readiness.fd, .clock = .monotonic };
timer.deinit();
},
.child_exit => {
var watcher: ChildWatcher = .{ .id = op.child, .fd = readiness.fd };
watcher.deinit();
Expand Down
37 changes: 0 additions & 37 deletions src/aio/posix/bsd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -109,40 +109,3 @@ pub const ChildWatcher = struct {
self.* = undefined;
}
};

pub const Timer = struct {
fd: std.posix.fd_t,
clock: posix.Clock,

pub fn init(clock: posix.Clock) !@This() {
return .{ .fd = try std.posix.kqueue(), .clock = clock };
}

pub fn set(self: *@This(), ns: u128) !void {
_ = std.posix.kevent(self.fd, &.{.{
.ident = @intCast(self.fd),
.filter = std.posix.system.EVFILT_TIMER,
.flags = std.posix.system.EV_ADD | std.posix.system.EV_ENABLE | std.posix.system.EV_ONESHOT,
.fflags = switch (builtin.target.os.tag) {
.dragonfly => 0,
else => NOTE_NSECONDS,
},
.data = switch (builtin.target.os.tag) {
.dragonfly => @intCast(ns / std.time.ns_per_ms), // :sadface:
else => @intCast(ns), // :sadface:
},
.udata = 0,
}}, &.{}, null) catch |err| return switch (err) {
error.EventNotFound => unreachable,
error.ProcessNotFound => unreachable,
error.AccessDenied => unreachable,
error.SystemResources => |e| e,
else => error.Unexpected,
};
}

pub fn deinit(self: *@This()) void {
std.posix.close(self.fd);
self.* = undefined;
}
};
1 change: 0 additions & 1 deletion src/aio/posix/darwin.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const bsd = @import("bsd.zig");

pub const EventSource = bsd.EventSource;
pub const ChildWatcher = bsd.ChildWatcher;
pub const Timer = bsd.Timer;

pub const msghdr = extern struct {
/// Optional address.
Expand Down
39 changes: 0 additions & 39 deletions src/aio/posix/linux.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,45 +57,6 @@ pub const ChildWatcher = struct {
}
};

pub const Timer = struct {
fd: std.posix.fd_t,
clock: posix.Clock,

pub fn init(clock: posix.Clock) !@This() {
const fd = std.posix.timerfd_create(switch (clock) {
.monotonic => std.posix.CLOCK.MONOTONIC,
.boottime => std.posix.CLOCK.BOOTTIME,
.realtime => std.posix.CLOCK.REALTIME,
}, .{
.CLOEXEC = true,
.NONBLOCK = true,
}) catch |err| return switch (err) {
error.AccessDenied => unreachable,
else => |e| e,
};
return .{ .fd = fd, .clock = clock };
}

pub fn set(self: *@This(), ns: u128) !void {
const ts: std.os.linux.itimerspec = .{
.it_value = .{
.tv_sec = @intCast(ns / std.time.ns_per_s),
.tv_nsec = @intCast(ns % std.time.ns_per_s),
},
.it_interval = .{ .tv_sec = 0, .tv_nsec = 0 },
};
_ = std.posix.timerfd_settime(self.fd, .{}, &ts, null) catch |err| return switch (err) {
error.Canceled, error.InvalidHandle => unreachable,
error.Unexpected => |e| e,
};
}

pub fn deinit(self: *@This()) void {
std.posix.close(self.fd);
self.* = undefined;
}
};

// std.os.linux.errnoFromSyscall is not pub :(
fn errnoFromSyscall(r: usize) std.os.linux.E {
const signed_r: isize = @bitCast(r);
Expand Down
37 changes: 0 additions & 37 deletions src/aio/posix/windows.zig
Original file line number Diff line number Diff line change
Expand Up @@ -85,43 +85,6 @@ pub const ChildWatcher = struct {
}
};

pub const Timer = struct {
fd: std.posix.fd_t,
clock: posix.Clock,

pub fn init(clock: posix.Clock) !@This() {
return .{
.fd = try (threading.CreateWaitableTimerW(null, 0, null) orelse error.SystemResources),
.clock = clock,
};
}

fn nanoSecondsToTimerTime(ns: i128) win32.foundation.LARGE_INTEGER {
const LARGE_INTEGER = packed struct(u64) { l: u32, h: u32 };
const signed: i64 = @intCast(@divFloor(ns, 100));
const adjusted: u64 = @bitCast(signed);
return .{
.QuadPart = @bitCast(LARGE_INTEGER{
.l = @truncate(adjusted >> 32),
.h = @truncate(adjusted),
}),
};
}

pub fn set(self: *@This(), ns: u128) !void {
const rel_time: i128 = @intCast(ns);
const li = nanoSecondsToTimerTime(-rel_time);
if (threading.SetWaitableTimer(self.fd, &li, 0, null, null, 0) == 0) {
return unexpectedError(GetLastError());
}
}

pub fn deinit(self: *@This()) void {
checked(CloseHandle(self.fd));
self.* = undefined;
}
};

pub fn translateTty(_: std.posix.fd_t, _: []u8, _: *ops.ReadTty.TranslationState) ops.ReadTty.Error!usize {
if (true) @panic("TODO");
return 0;
Expand Down
1 change: 1 addition & 0 deletions src/minilib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub const DynamicThreadPool = @import("minilib/DynamicThreadPool.zig");
pub const FixedArrayList = @import("minilib/fixed_array_list.zig").FixedArrayList;
pub const DoubleBufferedFixedArrayList = @import("minilib/fixed_array_list.zig").DoubleBufferedFixedArrayList;
pub const ItemPool = @import("minilib/item_pool.zig").ItemPool;
pub const TimerQueue = @import("minilib/TimerQueue.zig");

const std = @import("std");

Expand Down
Loading

0 comments on commit d80b6e2

Please sign in to comment.