Skip to content

Commit

Permalink
wip non linux support
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jun 19, 2024
1 parent f8ee6d2 commit cdab4fb
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 178 deletions.
6 changes: 6 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});

var opts = b.addOptions();
const fallback = b.option(bool, "fallback", "use fallback event loop") orelse false;
opts.addOption(bool, "fallback", fallback);

const aio = b.addModule("aio", .{
.root_source_file = b.path("src/aio.zig"),
.target = target,
.optimize = optimize,
});
aio.addImport("build_options", opts.createModule());

const coro = b.addModule("coro", .{
.root_source_file = b.path("src/coro.zig"),
Expand Down Expand Up @@ -46,6 +51,7 @@ pub fn build(b: *std.Build) void {
.target = target,
.optimize = optimize,
});
if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule());
if (mod == .coro) tst.root_module.addImport("aio", aio);
const run = b.addRunArtifact(tst);
test_step.dependOn(&run.step);
Expand Down
5 changes: 3 additions & 2 deletions docs/pages/coro-blocking-code.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ fn task(pool: *ThreadPool) !void {
try std.testing.expectEqual(69, ret);
}
var scheduler = try Scheduler.init(std.testing.allocator, .{});
defer scheduler.deinit();
var pool: ThreadPool = .{};
defer pool.deinit(); // pool must always be destroyed before scheduler
try pool.start(std.testing.allocator, 0);
var scheduler = try Scheduler.init(std.testing.allocator, .{});
defer scheduler.deinit();
_ = try scheduler.spawn(task, .{&pool}, .{});
try scheduler.run();
```
Expand Down
7 changes: 5 additions & 2 deletions examples/aio_dynamic.zig
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ pub fn main() !void {
},
});

const ret = try work.complete(.blocking);
var num_work: u16 = 2;
while (num_work > 0) {
const ret = try work.complete(.blocking);
num_work -= ret.num_completed;
}

log.info("{s}", .{buf[0..len]});
log.info("{s}", .{buf2[0..len2]});
log.info("{}", .{ret});
}
4 changes: 3 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
# Zig flake helper
# Check the flake.nix in zig2nix project for more options:
# <https://github.com/Cloudef/zig2nix/blob/master/flake.nix>
env = zig2nix.outputs.zig-env.${system} { zig = zig2nix.outputs.packages.${system}.zig.master.bin; };
env = zig2nix.outputs.zig-env.${system} {
zig = zig2nix.outputs.packages.${system}.zig.master.bin;
};
system-triple = env.lib.zigTripleFromString system;
in with builtins; with env.lib; with env.pkgs.lib; rec {
# nix run .
Expand Down
13 changes: 8 additions & 5 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
//! On linux this is a very shim wrapper around `io_uring`, on other systems there might be more overhead

const std = @import("std");
const build_options = @import("build_options");

pub const InitError = error{
OutOfMemory,
PermissionDenied,
ProcessQuotaExceeded,
SystemQuotaExceeded,
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
SystemResources,
SystemOutdated,
Unexpected,
Expand All @@ -21,7 +22,6 @@ pub const QueueError = error{

pub const CompletionError = error{
CompletionQueueOvercommitted,
SubmissionQueueEntryInvalid,
SystemResources,
Unexpected,
};
Expand Down Expand Up @@ -130,8 +130,11 @@ pub const EventSource = struct {
};

const IO = switch (@import("builtin").target.os.tag) {
.linux => @import("aio/linux.zig"),
else => @compileError("unsupported os"),
.linux => if (build_options.fallback)
@import("aio/fallback.zig")
else
@import("aio/io_uring.zig"),
else => @import("aio/fallback.zig"),
};

const ops = @import("aio/ops.zig");
Expand Down
25 changes: 25 additions & 0 deletions src/aio/common/eventfd.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const std = @import("std");

pub const EventSource = struct {
fd: std.posix.fd_t,

pub inline fn init() !@This() {
return .{
.fd = try std.posix.eventfd(0, std.os.linux.EFD.CLOEXEC),
};
}

pub inline fn deinit(self: *@This()) void {
std.posix.close(self.fd);
self.* = undefined;
}

pub inline fn notify(self: *@This()) void {
_ = std.posix.write(self.fd, &std.mem.toBytes(@as(u64, 1))) catch unreachable;
}

pub inline fn wait(self: *@This()) void {
var v: u64 = undefined;
_ = std.posix.read(self.fd, std.mem.asBytes(&v)) catch unreachable;
}
};
80 changes: 80 additions & 0 deletions src/aio/common/id.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const std = @import("std");

pub fn Pool(T: type, SZ: type) type {
return struct {
pub const Node = union(enum) { free: ?SZ, used: T };
nodes: []Node,
free: ?SZ = null,
num_free: SZ = 0,
num_used: SZ = 0,

pub const Error = error{
OutOfMemory,
};

pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() {
return .{ .nodes = try allocator.alloc(Node, n) };
}

pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
allocator.free(self.nodes);
self.* = undefined;
}

pub fn empty(self: *@This()) bool {
return self.num_used == self.num_free;
}

pub fn next(self: *@This()) ?SZ {
if (self.free) |fslot| return fslot;
if (self.num_used >= self.nodes.len) return null;
return self.num_used;
}

pub fn add(self: *@This(), item: T) Error!SZ {
if (self.free) |fslot| {
self.free = self.nodes[fslot].free;
self.nodes[fslot] = .{ .used = item };
self.num_free -= 1;
return fslot;
}
if (self.num_used >= self.nodes.len) return error.OutOfMemory;
self.nodes[self.num_used] = .{ .used = item };
defer self.num_used += 1;
return self.num_used;
}

pub fn remove(self: *@This(), slot: SZ) void {
if (self.free) |fslot| {
self.nodes[slot] = .{ .free = fslot };
} else {
self.nodes[slot] = .{ .free = null };
}
self.free = slot;
self.num_free += 1;
}

pub fn get(self: *@This(), slot: SZ) *T {
return &self.nodes[slot].used;
}

pub const Iterator = struct {
items: []Node,
index: SZ = 0,

pub fn next(self: *@This()) *T {
while (self.index < self.items.len) {
defer self.index += 1;
if (self.items[self.index] == .used) {
return &self.items[self.index].used;
}
}
return null;
}
};

pub fn iterator(self: *@This()) Iterator {
return .{ .items = self.nodes[0..self.num_used] };
}
};
}
47 changes: 47 additions & 0 deletions src/aio/common/posix.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const std = @import("std");

pub const RENAME_NOREPLACE = 1 << 0;

pub fn convertOpenFlags(flags: std.fs.File.OpenFlags) std.posix.O {
var os_flags: std.posix.O = .{
.ACCMODE = switch (flags.mode) {
.read_only => .RDONLY,
.write_only => .WRONLY,
.read_write => .RDWR,
},
};
if (@hasField(std.posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
if (@hasField(std.posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(std.posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;

// Use the O locking flags if the os supports them to acquire the lock
// atomically.
const has_flock_open_flags = @hasField(std.posix.O, "EXLOCK");
if (has_flock_open_flags) {
// Note that the NONBLOCK flag is removed after the openat() call
// is successful.
switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
}
}
return os_flags;
}

pub fn statusToTerm(status: u32) std.process.Child.Term {
return if (std.posix.W.IFEXITED(status))
.{ .Exited = std.posix.W.EXITSTATUS(status) }
else if (std.posix.W.IFSIGNALED(status))
.{ .Signal = std.posix.W.TERMSIG(status) }
else if (std.posix.W.IFSTOPPED(status))
.{ .Stopped = std.posix.W.STOPSIG(status) }
else
.{ .Unknown = status };
}
Loading

0 comments on commit cdab4fb

Please sign in to comment.