diff --git a/src/aio.zig b/src/aio.zig index c4ebf0b..0ef29fd 100644 --- a/src/aio.zig +++ b/src/aio.zig @@ -38,7 +38,11 @@ pub const Error = error{ ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, + UserResourceLimitReached, + ThreadQuotaExceeded, + LockedMemoryLimitExceeded, SystemOutdated, + Unsupported, Unexpected, }; diff --git a/src/aio/Fallback.zig b/src/aio/Fallback.zig index 4f11f58..a2b9a99 100644 --- a/src/aio/Fallback.zig +++ b/src/aio/Fallback.zig @@ -6,6 +6,7 @@ const Operation = @import("ops.zig").Operation; const ItemPool = @import("minilib").ItemPool; const FixedArrayList = @import("minilib").FixedArrayList; const DoubleBufferedFixedArrayList = @import("minilib").DoubleBufferedFixedArrayList; +const TimerQueue = @import("minilib").TimerQueue; const DynamicThreadPool = @import("minilib").DynamicThreadPool; const Uringlator = @import("Uringlator.zig"); const log = std.log.scoped(.aio_fallback); @@ -28,8 +29,7 @@ comptime { pub const EventSource = posix.EventSource; -const Result = struct { failure: Operation.Error, id: u16 }; - +tqueue: TimerQueue, // timer queue implementing linux -like timers readiness: []posix.Readiness, // readiness fd that gets polled before we perform the operation pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup tpool: DynamicThreadPool, // thread pool for performing operations, not all operations will be performed here @@ -42,6 +42,8 @@ pub fn isSupported(_: []const type) bool { } pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { + var tqueue = try TimerQueue.init(allocator); + errdefer tqueue.deinit(); const readiness = try allocator.alloc(posix.Readiness, n); errdefer allocator.free(readiness); @memset(readiness, .{}); @@ -63,6 +65,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { errdefer uringlator.deinit(allocator); pfd.add(.{ .fd = uringlator.source.fd, .events = std.posix.POLL.IN, .revents = 0 }) catch unreachable; return .{ + .tqueue = tqueue, .readiness = readiness, .pfd = pfd, .tpool = tpool, @@ -73,6 +76,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() { } pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.tqueue.deinit(); self.tpool.deinit(); self.kludge_tpool.deinit(); var iter = self.pending.iterator(.{}); @@ -167,11 +171,18 @@ fn onThreadExecutor(self: *@This(), id: u16, uop: *Operation.Union, readiness: p self.uringlator.finish(id, failure); } +fn onThreadTimeout(ctx: *anyopaque, user_data: usize) void { + var self: *@This() = @ptrCast(@alignCast(ctx)); + self.uringlator.finish(@intCast(user_data), error.Success); +} + fn start(self: *@This(), id: u16, uop: *Operation.Union) !void { if (self.readiness[id].mode == .nopoll or self.readiness[id].mode == .kludge or self.pending.isSet(id)) { switch (uop.*) { - .timeout => self.uringlator.finish(id, error.Success), - .link_timeout => self.uringlator.finishLinkTimeout(id), + inline .timeout, .link_timeout => |*op| { + const closure: TimerQueue.Closure = .{ .context = self, .callback = onThreadTimeout }; + try self.tqueue.schedule(.monotonic, op.ns, id, .{ .closure = closure }); + }, // can be performed here, doesn't have to be dispatched to thread inline .child_exit, .notify_event_source, .wait_event_source, .close_event_source => |*op| { var failure: Operation.Error = error.Success; @@ -197,7 +208,6 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void { Uringlator.debug("pending: {}: {}", .{ id, std.meta.activeTag(uop.*) }); } std.debug.assert(self.readiness[id].fd != posix.invalid_fd); - try Uringlator.uopUnwrapCall(uop, posix.armReadiness, .{self.readiness[id]}); self.pfd.add(.{ .fd = self.readiness[id].fd, .events = switch (self.readiness[id].mode) { @@ -211,11 +221,18 @@ fn start(self: *@This(), id: u16, uop: *Operation.Union) !void { } } -fn cancelable(self: *@This(), id: u16, _: *Operation.Union) bool { - return self.pending.isSet(id); +fn cancelable(self: *@This(), id: u16, uop: *Operation.Union) bool { + return self.pending.isSet(id) or switch (uop.*) { + .timeout, .link_timeout => true, + else => false, + }; } fn completion(self: *@This(), id: u16, uop: *Operation.Union) void { + switch (uop.*) { + .timeout, .link_timeout => self.tqueue.disarm(.monotonic, id), + else => {}, + } if (self.readiness[id].fd != posix.invalid_fd) { for (self.pfd.items[0..self.pfd.len], 0..) |pfd, idx| { if (pfd.fd == self.readiness[id].fd) { diff --git a/src/aio/Uringlator.zig b/src/aio/Uringlator.zig index d89ff20..c409290 100644 --- a/src/aio/Uringlator.zig +++ b/src/aio/Uringlator.zig @@ -148,25 +148,6 @@ pub fn submit( return true; } -pub fn finishLinkTimeout(self: *@This(), id: u16) void { - var iter = self.ops.iterator(); - const res: enum { ok, not_found } = blk: { - while (iter.next()) |e| { - if (e.k != id and self.next[e.k] == id) { - self.finish(e.k, error.Canceled); - self.next[e.k] = e.k; - break :blk .ok; - } - } - break :blk .not_found; - }; - if (res == .ok) { - self.finish(id, error.Expired); - } else { - self.finish(id, error.Success); - } -} - fn start( self: *@This(), id: u16, @@ -207,22 +188,42 @@ pub fn complete( const finished = self.finished.swap(); var num_errors: u16 = 0; for (finished) |res| { - if (res.failure != error.Success) { - debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used), res.failure }); + var failure = res.failure; + if (self.ops.nodes[res.id].used == .link_timeout and failure != error.Canceled) { + var iter = self.ops.iterator(); + const cres: enum { ok, not_found } = blk: { + while (iter.next()) |e| { + if (e.k != res.id and self.next[e.k] == res.id) { + self.finish(e.k, error.Canceled); + self.next[e.k] = e.k; + break :blk .ok; + } + } + break :blk .not_found; + }; + if (cres == .ok) { + failure = error.Expired; + } else { + failure = error.Success; + } + } + + if (failure != error.Success) { + debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used), failure }); } else { debug("complete: {}: {} [OK]", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used) }); } - if (self.ops.nodes[res.id].used == .link_timeout and res.failure == error.Canceled) { + if (self.ops.nodes[res.id].used == .link_timeout and failure == error.Canceled) { // special case } else { - num_errors += @intFromBool(res.failure != error.Success); + num_errors += @intFromBool(failure != error.Success); } - uopUnwrapCall(&self.ops.nodes[res.id].used, completition, .{ self, res }); + uopUnwrapCall(&self.ops.nodes[res.id].used, completition, .{ self, .{ .id = res.id, .failure = failure } }); var uop = self.ops.nodes[res.id].used; - if (cb) |f| f(uop, @enumFromInt(res.id), res.failure != error.Success); + if (cb) |f| f(uop, @enumFromInt(res.id), failure != error.Success); completion_cb(ctx, res.id, &uop); self.removeOp(res.id); } @@ -261,7 +262,7 @@ pub fn uopUnwrapCall(uop: *Operation.Union, comptime func: anytype, args: anytyp pub fn debug(comptime fmt: []const u8, args: anytype) void { if (@import("builtin").is_test) { - std.debug.print("fallback: " ++ fmt ++ "\n", args); + std.debug.print("uringlator: " ++ fmt ++ "\n", args); } else { if (comptime !aio.options.debug) return; log.debug(fmt, args); diff --git a/src/aio/posix.zig b/src/aio/posix.zig index 1a747f6..a9cf6e0 100644 --- a/src/aio/posix.zig +++ b/src/aio/posix.zig @@ -33,14 +33,6 @@ pub const ChildWatcher = switch (builtin.target.os.tag) { else => @compileError("unsupported"), }; -pub const Timer = switch (builtin.target.os.tag) { - .linux => linux.Timer, - .windows => windows.Timer, - .freebsd, .openbsd, .dragonfly, .netbsd => bsd.Timer, - .macos, .ios, .watchos, .visionos, .tvos => darwin.Timer, - else => @compileError("unsupported"), -}; - pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O { var os_flags: std.posix.O = .{ .ACCMODE = switch (flags.mode) { @@ -252,7 +244,7 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness { else => .{ .fd = op.socket, .mode = .out }, }, .open_at, .close_file, .close_dir, .close_socket => .{}, - .timeout, .link_timeout => .{ .fd = (try Timer.init(.monotonic)).fd, .mode = .in }, + .timeout, .link_timeout => .{}, .cancel, .rename_at, .unlink_at, .mkdir_at, .symlink_at => .{}, .child_exit => .{ .fd = (try ChildWatcher.init(op.child)).fd, .mode = .in }, .wait_event_source => op.source.native.waitReadiness(), @@ -266,23 +258,9 @@ pub const ArmReadinessError = error{ Unexpected, }; -pub inline fn armReadiness(op: anytype, readiness: Readiness) ArmReadinessError!void { - switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { - .timeout, .link_timeout => { - var timer: Timer = .{ .fd = readiness.fd, .clock = .monotonic }; - try timer.set(op.ns); - }, - else => {}, - } -} - pub inline fn closeReadiness(op: anytype, readiness: Readiness) void { if (readiness.fd == invalid_fd) return; switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) { - .timeout, .link_timeout => { - var timer: Timer = .{ .fd = readiness.fd, .clock = .monotonic }; - timer.deinit(); - }, .child_exit => { var watcher: ChildWatcher = .{ .id = op.child, .fd = readiness.fd }; watcher.deinit(); diff --git a/src/aio/posix/bsd.zig b/src/aio/posix/bsd.zig index 0c3846a..8e429ee 100644 --- a/src/aio/posix/bsd.zig +++ b/src/aio/posix/bsd.zig @@ -109,40 +109,3 @@ pub const ChildWatcher = struct { self.* = undefined; } }; - -pub const Timer = struct { - fd: std.posix.fd_t, - clock: posix.Clock, - - pub fn init(clock: posix.Clock) !@This() { - return .{ .fd = try std.posix.kqueue(), .clock = clock }; - } - - pub fn set(self: *@This(), ns: u128) !void { - _ = std.posix.kevent(self.fd, &.{.{ - .ident = @intCast(self.fd), - .filter = std.posix.system.EVFILT_TIMER, - .flags = std.posix.system.EV_ADD | std.posix.system.EV_ENABLE | std.posix.system.EV_ONESHOT, - .fflags = switch (builtin.target.os.tag) { - .dragonfly => 0, - else => NOTE_NSECONDS, - }, - .data = switch (builtin.target.os.tag) { - .dragonfly => @intCast(ns / std.time.ns_per_ms), // :sadface: - else => @intCast(ns), // :sadface: - }, - .udata = 0, - }}, &.{}, null) catch |err| return switch (err) { - error.EventNotFound => unreachable, - error.ProcessNotFound => unreachable, - error.AccessDenied => unreachable, - error.SystemResources => |e| e, - else => error.Unexpected, - }; - } - - pub fn deinit(self: *@This()) void { - std.posix.close(self.fd); - self.* = undefined; - } -}; diff --git a/src/aio/posix/darwin.zig b/src/aio/posix/darwin.zig index 3f82030..2277473 100644 --- a/src/aio/posix/darwin.zig +++ b/src/aio/posix/darwin.zig @@ -3,7 +3,6 @@ const bsd = @import("bsd.zig"); pub const EventSource = bsd.EventSource; pub const ChildWatcher = bsd.ChildWatcher; -pub const Timer = bsd.Timer; pub const msghdr = extern struct { /// Optional address. diff --git a/src/aio/posix/linux.zig b/src/aio/posix/linux.zig index 1316650..464258a 100644 --- a/src/aio/posix/linux.zig +++ b/src/aio/posix/linux.zig @@ -57,45 +57,6 @@ pub const ChildWatcher = struct { } }; -pub const Timer = struct { - fd: std.posix.fd_t, - clock: posix.Clock, - - pub fn init(clock: posix.Clock) !@This() { - const fd = std.posix.timerfd_create(switch (clock) { - .monotonic => std.posix.CLOCK.MONOTONIC, - .boottime => std.posix.CLOCK.BOOTTIME, - .realtime => std.posix.CLOCK.REALTIME, - }, .{ - .CLOEXEC = true, - .NONBLOCK = true, - }) catch |err| return switch (err) { - error.AccessDenied => unreachable, - else => |e| e, - }; - return .{ .fd = fd, .clock = clock }; - } - - pub fn set(self: *@This(), ns: u128) !void { - const ts: std.os.linux.itimerspec = .{ - .it_value = .{ - .tv_sec = @intCast(ns / std.time.ns_per_s), - .tv_nsec = @intCast(ns % std.time.ns_per_s), - }, - .it_interval = .{ .tv_sec = 0, .tv_nsec = 0 }, - }; - _ = std.posix.timerfd_settime(self.fd, .{}, &ts, null) catch |err| return switch (err) { - error.Canceled, error.InvalidHandle => unreachable, - error.Unexpected => |e| e, - }; - } - - pub fn deinit(self: *@This()) void { - std.posix.close(self.fd); - self.* = undefined; - } -}; - // std.os.linux.errnoFromSyscall is not pub :( fn errnoFromSyscall(r: usize) std.os.linux.E { const signed_r: isize = @bitCast(r); diff --git a/src/aio/posix/windows.zig b/src/aio/posix/windows.zig index 178f21e..dd19a98 100644 --- a/src/aio/posix/windows.zig +++ b/src/aio/posix/windows.zig @@ -85,43 +85,6 @@ pub const ChildWatcher = struct { } }; -pub const Timer = struct { - fd: std.posix.fd_t, - clock: posix.Clock, - - pub fn init(clock: posix.Clock) !@This() { - return .{ - .fd = try (threading.CreateWaitableTimerW(null, 0, null) orelse error.SystemResources), - .clock = clock, - }; - } - - fn nanoSecondsToTimerTime(ns: i128) win32.foundation.LARGE_INTEGER { - const LARGE_INTEGER = packed struct(u64) { l: u32, h: u32 }; - const signed: i64 = @intCast(@divFloor(ns, 100)); - const adjusted: u64 = @bitCast(signed); - return .{ - .QuadPart = @bitCast(LARGE_INTEGER{ - .l = @truncate(adjusted >> 32), - .h = @truncate(adjusted), - }), - }; - } - - pub fn set(self: *@This(), ns: u128) !void { - const rel_time: i128 = @intCast(ns); - const li = nanoSecondsToTimerTime(-rel_time); - if (threading.SetWaitableTimer(self.fd, &li, 0, null, null, 0) == 0) { - return unexpectedError(GetLastError()); - } - } - - pub fn deinit(self: *@This()) void { - checked(CloseHandle(self.fd)); - self.* = undefined; - } -}; - pub fn translateTty(_: std.posix.fd_t, _: []u8, _: *ops.ReadTty.TranslationState) ops.ReadTty.Error!usize { if (true) @panic("TODO"); return 0; diff --git a/src/minilib.zig b/src/minilib.zig index 7756cef..927053f 100644 --- a/src/minilib.zig +++ b/src/minilib.zig @@ -2,6 +2,7 @@ pub const DynamicThreadPool = @import("minilib/DynamicThreadPool.zig"); pub const FixedArrayList = @import("minilib/fixed_array_list.zig").FixedArrayList; pub const DoubleBufferedFixedArrayList = @import("minilib/fixed_array_list.zig").DoubleBufferedFixedArrayList; pub const ItemPool = @import("minilib/item_pool.zig").ItemPool; +pub const TimerQueue = @import("minilib/TimerQueue.zig"); const std = @import("std"); diff --git a/src/minilib/DynamicThreadPool.zig b/src/minilib/DynamicThreadPool.zig index 600eda4..8da208a 100644 --- a/src/minilib/DynamicThreadPool.zig +++ b/src/minilib/DynamicThreadPool.zig @@ -1,7 +1,5 @@ //! Basically `std.Thread.Pool` but supports timeout //! That is, if threads have been inactive for specific timeout the pool will release the threads -//! The `num_threads` are also the maximum count of worker threads, but if there's not much activity -//! less threads are used. const builtin = @import("builtin"); const std = @import("std"); @@ -42,7 +40,7 @@ pub fn init(allocator: std.mem.Allocator, options: Options) InitError!@This() { _ = try std.time.Timer.start(); // check that we have a timer - const thread_count = options.max_threads orelse @max(1, std.Thread.getCpuCount() catch 1); + const thread_count = @max(1, options.max_threads orelse std.Thread.getCpuCount() catch 1); var serial = try std.DynamicBitSetUnmanaged.initEmpty(allocator, thread_count); errdefer serial.deinit(allocator); const threads = try allocator.alloc(DynamicThread, thread_count); diff --git a/src/minilib/TimerQueue.zig b/src/minilib/TimerQueue.zig new file mode 100644 index 0000000..ce43064 --- /dev/null +++ b/src/minilib/TimerQueue.zig @@ -0,0 +1,475 @@ +//! Mimics linux's timerfd timers +//! Used on platforms where native timers aren't accurate enough or have other limitations +//! Requires threading support + +const builtin = @import("builtin"); +const std = @import("std"); + +const root = @import("root"); +pub const options: Options = if (@hasDecl(root, "timer_queue_options")) root.timer_queue_options else .{}; + +pub const Options = struct { + /// Force the use of foreign backend even if the target platform is linux + /// Mostly useful for testing + force_foreign_backend: bool = false, +}; + +pub const Closure = struct { + pub const Callback = *const fn (context: *anyopaque, user_data: usize) void; + context: *anyopaque, + callback: Callback, +}; + +pub const Clock = enum { + monotonic, + boottime, + realtime, +}; + +pub const TimeoutOptions = struct { + // repeats? + repeat: ?u32 = null, + // absolute value? + abs: bool = false, + // callback + closure: Closure, +}; + +const Timeout = struct { + ns: u128, + start: u128, + opts: TimeoutOptions, + user_data: usize, + + pub fn sort(_: void, a: @This(), b: @This()) std.math.Order { + return std.math.order(a.ns, b.ns); + } +}; + +/// The 3 queues share lots of similarities, so use a mixin +fn Mixin(T: type) type { + return struct { + pub fn init(allocator: std.mem.Allocator) !T { + _ = try T.now(); // check that we can get time + return .{ .queue = std.PriorityQueue(Timeout, void, Timeout.sort).init(allocator, {}) }; + } + + pub fn deinit(self: *T) void { + if (self.thread) |thrd| { + { + self.mutex.lock(); + defer self.mutex.unlock(); + while (self.queue.removeOrNull()) |_| {} + self.thread = null; // to prevent thread from detaching + } + self.cond.broadcast(); + thrd.join(); + } + self.queue.deinit(); + self.* = undefined; + } + + pub fn schedule(self: *T, timeout: Timeout) !void { + { + self.mutex.lock(); + defer self.mutex.unlock(); + try self.queue.add(timeout); + } + self.cond.broadcast(); + { + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.thread == null) try self.start(); + } + } + + fn start(self: *T) !void { + @setCold(true); + if (self.thread) |_| unreachable; + self.thread = try std.Thread.spawn(.{ .allocator = self.queue.allocator }, T.threadMain, .{self}); + } + + pub fn disarm(self: *T, user_data: usize) void { + self.mutex.lock(); + defer self.mutex.unlock(); + for (self.queue.items, 0..) |*to, idx| { + if (to.user_data != user_data) continue; + _ = self.queue.removeIndex(idx); + break; + } + } + }; +} + +/// Monotonic timers by linux definition do not count the suspend time +/// This is the simplest one to implement, as we don't have to register suspend callbacks from the OS +const MonotonicQueue = struct { + thread: ?std.Thread = null, + queue: std.PriorityQueue(Timeout, void, Timeout.sort), + mutex: std.Thread.Mutex = .{}, + cond: std.Thread.Condition = .{}, + usingnamespace Mixin(@This()); + + pub fn now() !u128 { + const clock_id = switch (builtin.os.tag) { + .windows => { + // TODO: make this equivalent to linux MONOTONIC + // + const qpc = std.os.windows.QueryPerformanceCounter(); + const qpf = std.os.windows.QueryPerformanceFrequency(); + + // 10Mhz (1 qpc tick every 100ns) is a common enough QPF value that we can optimize on it. + // https://github.com/microsoft/STL/blob/785143a0c73f030238ef618890fd4d6ae2b3a3a0/stl/inc/chrono#L694-L701 + const common_qpf = 10_000_000; + if (qpf == common_qpf) { + return qpc * (std.time.ns_per_s / common_qpf); + } + + // Convert to ns using fixed point. + const scale = @as(u64, std.time.ns_per_s << 32) / @as(u32, @intCast(qpf)); + const result = (@as(u96, qpc) * scale) >> 32; + return result; + }, + .wasi => { + var ns: std.os.wasi.timestamp_t = undefined; + const rc = std.os.wasi.clock_time_get(.MONOTONIC, 1, &ns); + if (rc != .SUCCESS) return error.Unsupported; + return ns; + }, + .macos, .ios, .tvos, .watchos, .visionos => std.posix.CLOCK.MONOTONIC_RAW, + .linux => std.posix.CLOCK.MONOTONIC, + .uefi => @panic("unsupported"), + else => std.posix.CLOCK.BOOTTIME, + }; + var ts: std.posix.timespec = undefined; + std.posix.clock_gettime(clock_id, &ts) catch return error.Unsupported; + return @abs((@as(i128, ts.tv_sec) * std.time.ns_per_s) + ts.tv_nsec); + } + + fn threadMain(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.thread.?.setName("MonotonicQueue") catch {}; + + while (self.queue.peek()) |timeout| { + { + const rn = now() catch unreachable; + if (rn < timeout.start + timeout.ns) { + self.cond.timedWait(&self.mutex, @truncate(timeout.start + timeout.ns - rn)) catch {}; + } + } + const rn = now() catch unreachable; + while (self.queue.peek()) |to| { + if (rn >= to.start + to.ns) { + std.log.info("mq timeout! {}", .{timeout.user_data}); + to.opts.closure.callback(to.opts.closure.context, to.user_data); + _ = self.queue.removeOrNull(); + } else break; // all other timeouts are later + } + } + + if (self.thread) |thrd| { + thrd.detach(); + self.thread = null; + } + } +}; + +/// Similar to monotonic queue but needs to be woken up when PC wakes up from suspend +/// and check if any timers got expired +const BoottimeQueue = struct { + thread: ?std.Thread = null, + queue: std.PriorityQueue(Timeout, void, Timeout.sort), + mutex: std.Thread.Mutex = .{}, + cond: std.Thread.Condition = .{}, + usingnamespace Mixin(@This()); + + pub fn now() !u128 { + const clock_id = switch (builtin.os.tag) { + .windows => { + const qpc = std.os.windows.QueryPerformanceCounter(); + const qpf = std.os.windows.QueryPerformanceFrequency(); + + // 10Mhz (1 qpc tick every 100ns) is a common enough QPF value that we can optimize on it. + // https://github.com/microsoft/STL/blob/785143a0c73f030238ef618890fd4d6ae2b3a3a0/stl/inc/chrono#L694-L701 + const common_qpf = 10_000_000; + if (qpf == common_qpf) { + return qpc * (std.time.ns_per_s / common_qpf); + } + + // Convert to ns using fixed point. + const scale = @as(u64, std.time.ns_per_s << 32) / @as(u32, @intCast(qpf)); + const result = (@as(u96, qpc) * scale) >> 32; + return result; + }, + .wasi => { + var ns: std.os.wasi.timestamp_t = undefined; + const rc = std.os.wasi.clock_time_get(.MONOTONIC, 1, &ns); + if (rc != .SUCCESS) return error.Unsupported; + return ns; + }, + .macos, .ios, .tvos, .watchos, .visionos => std.posix.CLOCK.UPTIME_RAW, + .linux => std.posix.CLOCK.BOOTTIME, + .uefi => @panic("unsupported"), + else => std.posix.CLOCK.MONOTONIC, + }; + var ts: std.posix.timespec = undefined; + std.posix.clock_gettime(clock_id, &ts) catch return error.Unsupported; + return @abs((@as(i128, ts.tv_sec) * std.time.ns_per_s) + ts.tv_nsec); + } + + fn threadMain(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.thread.?.setName("BoottimeQueue") catch {}; + + while (self.queue.peek()) |timeout| { + // TODO: wakeup if coming out from suspend + { + const rn = now() catch unreachable; + if (rn < timeout.start + timeout.ns) { + self.cond.timedWait(&self.mutex, @truncate(timeout.start + timeout.ns - rn)) catch {}; + } + } + const rn = now() catch unreachable; + while (self.queue.peek()) |to| { + if (rn >= to.start + to.ns) { + std.log.info("bq timeout! {}", .{timeout.user_data}); + to.opts.closure.callback(to.opts.closure.context, to.user_data); + _ = self.queue.removeOrNull(); + } else break; // all other timeouts are later + } + } + + if (self.thread) |thrd| { + thrd.detach(); + self.thread = null; + } + } +}; + +/// Checks every second whether any timers has been expired. +/// Not great accuracy, and every second lets us do implementation without much facilities needed from the OS. +const RealtimeQueue = struct { + thread: ?std.Thread = null, + queue: std.PriorityQueue(Timeout, void, Timeout.sort), + mutex: std.Thread.Mutex = .{}, + cond: std.Thread.Condition = .{}, + usingnamespace Mixin(@This()); + + pub fn now() !u128 { + return @abs(std.time.nanoTimestamp()); + } + + fn threadMain(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.thread.?.setName("RealtimeQueue") catch {}; + + while (self.queue.peek()) |timeout| { + self.cond.timedWait(&self.mutex, std.time.ns_per_s) catch {}; + const rn = now() catch unreachable; + while (self.queue.peek()) |to| { + if (rn >= to.start + to.ns) { + std.log.info("rq timeout! {}", .{timeout.user_data}); + to.opts.closure.callback(to.opts.closure.context, to.user_data); + _ = self.queue.removeOrNull(); + } else break; // all other timeouts are later + } + } + + if (self.thread) |thrd| { + thrd.detach(); + self.thread = null; + } + } +}; + +const ForeignTimerQueue = struct { + mq: MonotonicQueue, + bq: BoottimeQueue, + rq: RealtimeQueue, + + pub fn init(allocator: std.mem.Allocator) !@This() { + return .{ + .mq = try MonotonicQueue.init(allocator), + .bq = try BoottimeQueue.init(allocator), + .rq = try RealtimeQueue.init(allocator), + }; + } + + pub fn deinit(self: *@This()) void { + self.mq.deinit(); + self.bq.deinit(); + self.rq.deinit(); + self.* = undefined; + } + + pub fn schedule(self: *@This(), clock: Clock, ns: u128, user_data: usize, opts: TimeoutOptions) !void { + if (opts.repeat != null and opts.abs) unreachable; // repeats can't be used with abs + try switch (clock) { + .monotonic => self.mq.schedule(.{ .ns = ns, .start = if (!opts.abs) MonotonicQueue.now() catch unreachable else 0, .opts = opts, .user_data = user_data }), + .boottime => self.bq.schedule(.{ .ns = ns, .start = if (!opts.abs) BoottimeQueue.now() catch unreachable else 0, .opts = opts, .user_data = user_data }), + .realtime => self.rq.schedule(.{ .ns = ns, .start = if (!opts.abs) RealtimeQueue.now() catch unreachable else 0, .opts = opts, .user_data = user_data }), + }; + } + + pub fn disarm(self: *@This(), clock: Clock, user_data: usize) void { + switch (clock) { + .monotonic => self.mq.disarm(user_data), + .boottime => self.bq.disarm(user_data), + .realtime => self.rq.disarm(user_data), + } + } +}; + +const LinuxTimerQueue = struct { + const Context = struct { + fd: std.posix.fd_t, + repeat: ?u32, + closure: Closure, + }; + + allocator: std.mem.Allocator, + fds: std.AutoHashMapUnmanaged(usize, Context) = .{}, + efd: std.posix.fd_t, + epoll: std.posix.fd_t, + mutex: std.Thread.Mutex = .{}, + thread: ?std.Thread = null, + + pub fn init(allocator: std.mem.Allocator) !@This() { + const epoll = try std.posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC); + errdefer std.posix.close(epoll); + const efd = try std.posix.eventfd(0, std.os.linux.EFD.CLOEXEC | std.os.linux.EFD.NONBLOCK); + errdefer std.posix.close(efd); + var ev: std.os.linux.epoll_event = .{ .data = .{ .ptr = 0xDEADBEEF }, .events = std.os.linux.EPOLL.IN }; + std.posix.epoll_ctl(epoll, std.os.linux.EPOLL.CTL_ADD, efd, &ev) catch |err| return switch (err) { + error.FileDescriptorAlreadyPresentInSet => unreachable, + error.OperationCausesCircularLoop => unreachable, + error.FileDescriptorNotRegistered => unreachable, + error.FileDescriptorIncompatibleWithEpoll => unreachable, + else => |e| e, + }; + return .{ + .allocator = allocator, + .efd = efd, + .epoll = epoll, + }; + } + + pub fn deinit(self: *@This()) void { + _ = std.posix.write(self.efd, &std.mem.toBytes(@as(u64, 1))) catch unreachable; + if (self.thread) |thrd| thrd.join(); + std.posix.close(self.epoll); + var iter = self.fds.iterator(); + while (iter.next()) |e| std.posix.close(e.value_ptr.fd); + self.fds.deinit(self.allocator); + std.posix.close(self.efd); + self.* = undefined; + } + + pub fn schedule(self: *@This(), clock: Clock, ns: u128, user_data: usize, opts: TimeoutOptions) !void { + if (opts.repeat != null and opts.abs) unreachable; // repeats can't be used with abs + const fd = switch (clock) { + .monotonic => std.posix.timerfd_create(std.posix.CLOCK.MONOTONIC, .{ .CLOEXEC = true, .NONBLOCK = true }), + .boottime => std.posix.timerfd_create(std.posix.CLOCK.BOOTTIME, .{ .CLOEXEC = true, .NONBLOCK = true }), + .realtime => std.posix.timerfd_create(std.posix.CLOCK.REALTIME, .{ .CLOEXEC = true, .NONBLOCK = true }), + } catch |err| return switch (err) { + error.AccessDenied => unreachable, + else => |e| e, + }; + errdefer std.posix.close(fd); + const ts: std.os.linux.itimerspec = .{ + .it_value = .{ + .tv_sec = @intCast(ns / std.time.ns_per_s), + .tv_nsec = @intCast(ns % std.time.ns_per_s), + }, + .it_interval = .{ .tv_sec = 0, .tv_nsec = 0 }, + }; + std.posix.timerfd_settime(fd, .{ .ABSTIME = opts.abs }, &ts, null) catch |err| return switch (err) { + error.Canceled, error.InvalidHandle => unreachable, + error.Unexpected => |e| e, + }; + self.mutex.lock(); + defer self.mutex.unlock(); + try self.fds.putNoClobber(self.allocator, user_data, .{ .fd = fd, .repeat = opts.repeat, .closure = opts.closure }); + errdefer _ = self.fds.remove(user_data); + var ev: std.os.linux.epoll_event = .{ .data = .{ .ptr = user_data }, .events = std.os.linux.EPOLL.IN }; + std.posix.epoll_ctl(self.epoll, std.os.linux.EPOLL.CTL_ADD, fd, &ev) catch |err| return switch (err) { + error.FileDescriptorAlreadyPresentInSet => unreachable, + error.OperationCausesCircularLoop => unreachable, + error.FileDescriptorNotRegistered => unreachable, + error.FileDescriptorIncompatibleWithEpoll => unreachable, + else => |e| e, + }; + if (self.thread == null) try self.start(); + } + + fn disarmInternal(self: *@This(), _: Clock, user_data: usize, lock: bool) void { + if (lock) self.mutex.lock(); + defer if (lock) self.mutex.unlock(); + if (self.fds.fetchRemove(user_data)) |e| { + var ev: std.os.linux.epoll_event = .{ .data = .{ .ptr = user_data }, .events = std.os.linux.EPOLL.IN }; + std.posix.epoll_ctl(self.epoll, std.os.linux.EPOLL.CTL_DEL, e.value.fd, &ev) catch unreachable; + std.posix.close(e.value.fd); + } + } + + pub fn disarm(self: *@This(), _: Clock, user_data: usize) void { + self.disarmInternal(undefined, user_data, true); + } + + fn start(self: *@This()) !void { + @setCold(true); + if (self.thread) |_| unreachable; + self.thread = try std.Thread.spawn(.{ .allocator = self.allocator }, threadMain, .{self}); + } + + fn threadMain(self: *@This()) void { + outer: while (true) { + var events: [32]std.os.linux.epoll_event = undefined; + const n = std.posix.epoll_wait(self.epoll, &events, -1); + for (events[0..n]) |ev| { + if (ev.data.ptr == 0xDEADBEEF) { + // quit signal + break :outer; + } + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.fds.get(ev.data.ptr)) |v| { + var exp: usize = undefined; + std.debug.assert(std.posix.read(v.fd, std.mem.asBytes(&exp)) catch unreachable == @sizeOf(usize)); + if (v.repeat == null or v.repeat.? <= exp) { + std.log.info("linux timeout! {}", .{ev.data.ptr}); + v.closure.callback(v.closure.context, ev.data.ptr); + self.disarmInternal(undefined, ev.data.ptr, false); + } + } + } + } + } +}; + +const NativeTimerQueue = switch (builtin.target.os.tag) { + .linux => if (options.force_foreign_backend) ForeignTimerQueue else LinuxTimerQueue, + else => ForeignTimerQueue, +}; + +impl: NativeTimerQueue, + +pub fn init(allocator: std.mem.Allocator) !@This() { + return .{ .impl = try NativeTimerQueue.init(allocator) }; +} + +pub fn deinit(self: *@This()) void { + self.impl.deinit(); + self.* = undefined; +} + +pub fn schedule(self: *@This(), clock: Clock, ns: u128, user_data: usize, opts: TimeoutOptions) !void { + return self.impl.schedule(clock, ns, user_data, opts); +} + +pub fn disarm(self: *@This(), clock: Clock, user_data: usize) void { + return self.impl.disarm(clock, user_data); +}