From b84f9e58a6fd71328f55f994f8775f81f9849a08 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Tue, 25 Jun 2024 16:27:50 +0900 Subject: [PATCH] vaxis: add aio event loop and example disabled by default, aio/coro might not be that mature yet. however, appreciated if people give it a go and report issues. --- build.zig | 11 ++ build.zig.zon | 5 + examples/aio.zig | 171 ++++++++++++++++++++++++++++++ src/aio.zig | 268 +++++++++++++++++++++++++++++++++++++++++++++++ src/main.zig | 1 + 5 files changed, 456 insertions(+) create mode 100644 examples/aio.zig create mode 100644 src/aio.zig diff --git a/build.zig b/build.zig index b51b512..78c58ec 100644 --- a/build.zig +++ b/build.zig @@ -4,11 +4,13 @@ pub fn build(b: *std.Build) void { const include_libxev = b.option(bool, "libxev", "Enable support for libxev library (default: true)") orelse true; const include_images = b.option(bool, "images", "Enable support for images (default: true)") orelse true; const include_text_input = b.option(bool, "text_input", "Enable support for the TextInput widget (default: true)") orelse true; + const include_aio = b.option(bool, "aio", "Enable support for zig-aio library (default: false)") orelse false; const options = b.addOptions(); options.addOption(bool, "libxev", include_libxev); options.addOption(bool, "images", include_images); options.addOption(bool, "text_input", include_text_input); + options.addOption(bool, "aio", include_aio); const options_mod = options.createModule(); @@ -33,6 +35,10 @@ pub fn build(b: *std.Build) void { .optimize = optimize, .target = target, }) else null; + const aio_dep = if (include_aio) b.lazyDependency("aio", .{ + .optimize = optimize, + .target = target, + }) else null; // Module const vaxis_mod = b.addModule("vaxis", .{ @@ -46,6 +52,8 @@ pub fn build(b: *std.Build) void { if (zigimg_dep) |dep| vaxis_mod.addImport("zigimg", dep.module("zigimg")); if (gap_buffer_dep) |dep| vaxis_mod.addImport("gap_buffer", dep.module("gap_buffer")); if (xev_dep) |dep| vaxis_mod.addImport("xev", dep.module("xev")); + if (aio_dep) |dep| vaxis_mod.addImport("aio", dep.module("aio")); + if (aio_dep) |dep| vaxis_mod.addImport("coro", dep.module("coro")); vaxis_mod.addImport("build_options", options_mod); // Examples @@ -59,6 +67,7 @@ pub fn build(b: *std.Build) void { vaxis, vt, xev, + aio, }; const example_option = b.option(Example, "example", "Example to run (default: text_input)") orelse .text_input; const example_step = b.step("example", "Run example"); @@ -73,6 +82,8 @@ pub fn build(b: *std.Build) void { }); example.root_module.addImport("vaxis", vaxis_mod); if (xev_dep) |dep| example.root_module.addImport("xev", dep.module("xev")); + if (aio_dep) |dep| example.root_module.addImport("aio", dep.module("aio")); + if (aio_dep) |dep| example.root_module.addImport("coro", dep.module("coro")); const example_run = b.addRunArtifact(example); example_step.dependOn(&example_run.step); diff --git a/build.zig.zon b/build.zig.zon index 2449903..f3f9b2d 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -22,6 +22,11 @@ .hash = "12207b7a5b538ffb7fb18f954ae17d2f8490b6e3778a9e30564ad82c58ee8da52361", .lazy = true, }, + .aio = .{ + .url = "git+https://github.com/Cloudef/zig-aio#be8e2b374bf223202090e282447fa4581029c2eb", + .hash = "122012a11b37a350395a32fdb514e57ff54a0f9d8d4ce09498b6c45ffb7211232920", + .lazy = true, + }, }, .paths = .{ "LICENSE", diff --git a/examples/aio.zig b/examples/aio.zig new file mode 100644 index 0000000..4ea9ed8 --- /dev/null +++ b/examples/aio.zig @@ -0,0 +1,171 @@ +const builtin = @import("builtin"); +const std = @import("std"); +const vaxis = @import("vaxis"); +const aio = @import("aio"); +const coro = @import("coro"); + +pub const panic = vaxis.panic_handler; + +const Event = union(enum) { + key_press: vaxis.Key, + winsize: vaxis.Winsize, +}; + +const Loop = vaxis.aio.Loop(Event); + +const Video = enum { no_state, ready, end }; +const Audio = enum { no_state, ready, end }; + +fn downloadTask(allocator: std.mem.Allocator, url: []const u8) ![]const u8 { + var client: std.http.Client = .{ .allocator = allocator }; + defer client.deinit(); + var body = std.ArrayList(u8).init(allocator); + _ = try client.fetch(.{ + .location = .{ .url = url }, + .response_storage = .{ .dynamic = &body }, + .max_append_size = 1.6e+7, + }); + return try body.toOwnedSlice(); +} + +fn audioTask(allocator: std.mem.Allocator) !void { + errdefer coro.yield(Audio.end) catch {}; + + // var child = std.process.Child.init(&.{ "aplay", "-Dplug:default", "-q", "-f", "S16_LE", "-r", "8000" }, allocator); + var child = std.process.Child.init(&.{ "mpv", "--audio-samplerate=16000", "--audio-channels=mono", "--audio-format=s16", "-" }, allocator); + child.stdin_behavior = .Pipe; + child.stdout_behavior = .Ignore; + child.stderr_behavior = .Ignore; + child.spawn() catch return; // no sound + defer _ = child.kill() catch {}; + + const sound = blk: { + var tpool: coro.ThreadPool = .{}; + try tpool.start(allocator, 1); + defer tpool.deinit(); + break :blk try tpool.yieldForCompletition(downloadTask, .{ allocator, "https://keroserene.net/lol/roll.s16" }); + }; + defer allocator.free(sound); + + try coro.yield(Audio.ready); + + var audio_off: usize = 0; + while (audio_off < sound.len) { + var written: usize = 0; + try coro.io.single(aio.Write{ .file = child.stdin.?, .buffer = sound[audio_off..], .out_written = &written }); + audio_off += written; + } + + // the audio is already fed to the player and the defer + // would kill the child stay here chilling + coro.yield(Audio.end) catch {}; +} + +fn videoTask(writer: std.io.AnyWriter) !void { + defer coro.yield(Video.end) catch {}; + + var socket: std.posix.socket_t = undefined; + try coro.io.single(aio.Socket{ + .domain = std.posix.AF.INET, + .flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC, + .protocol = std.posix.IPPROTO.TCP, + .out_socket = &socket, + }); + defer std.posix.close(socket); + + const address = std.net.Address.initIp4(.{ 44, 224, 41, 160 }, 1987); + try coro.io.single(aio.Connect{ + .socket = socket, + .addr = &address.any, + .addrlen = address.getOsSockLen(), + }); + + try coro.yield(Video.ready); + + var buf: [1024]u8 = undefined; + while (true) { + var read: usize = 0; + try coro.io.single(aio.Recv{ .socket = socket, .buffer = &buf, .out_read = &read }); + if (read == 0) break; + _ = try writer.write(buf[0..read]); + } +} + +fn loadingTask(vx: *vaxis.Vaxis, writer: std.io.AnyWriter) !void { + var color_idx: u8 = 30; + var dir: enum { up, down } = .up; + + while (true) { + try coro.io.single(aio.Timeout{ .ns = 8 * std.time.ns_per_ms }); + + const style: vaxis.Style = .{ .fg = .{ .rgb = [_]u8{ color_idx, color_idx, color_idx } } }; + const segment: vaxis.Segment = .{ .text = vaxis.logo, .style = style }; + + const win = vx.window(); + win.clear(); + + var loc = vaxis.widgets.alignment.center(win, 28, 4); + _ = try loc.printSegment(segment, .{ .wrap = .grapheme }); + + switch (dir) { + .up => { + color_idx += 1; + if (color_idx == 255) dir = .down; + }, + .down => { + color_idx -= 1; + if (color_idx == 30) dir = .up; + }, + } + + try vx.render(writer); + } +} + +pub fn main() !void { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var tty = try vaxis.Tty.init(); + defer tty.deinit(); + + var vx = try vaxis.init(allocator, .{}); + defer vx.deinit(allocator, tty.anyWriter()); + + var scheduler = try coro.Scheduler.init(allocator, .{}); + defer scheduler.deinit(); + + var loop = try Loop.init(); + try loop.spawn(&scheduler, &vx, &tty, null, .{}); + defer loop.deinit(&vx, &tty); + + try vx.enterAltScreen(tty.anyWriter()); + try vx.queryTerminalSend(tty.anyWriter()); + + var buffered_tty_writer = tty.bufferedWriter(); + const loading = try scheduler.spawn(loadingTask, .{ &vx, buffered_tty_writer.writer().any() }, .{}); + const audio = try scheduler.spawn(audioTask, .{allocator}, .{}); + const video = try scheduler.spawn(videoTask, .{buffered_tty_writer.writer().any()}, .{}); + + main: while (try scheduler.tick(.blocking) > 0) { + while (try loop.popEvent()) |event| switch (event) { + .key_press => |key| { + if (key.matches('c', .{ .ctrl = true })) { + break :main; + } + }, + .winsize => |ws| try vx.resize(allocator, buffered_tty_writer.writer().any(), ws), + }; + + if (audio.state(Video) == .ready and video.state(Audio) == .ready) { + loading.cancel(); + audio.wakeup(); + video.wakeup(); + } else if (audio.state(Audio) == .end and video.state(Video) == .end) { + break :main; + } + + try buffered_tty_writer.flush(); + } +} diff --git a/src/aio.zig b/src/aio.zig new file mode 100644 index 0000000..ee1733e --- /dev/null +++ b/src/aio.zig @@ -0,0 +1,268 @@ +const builtin = @import("builtin"); +const std = @import("std"); +const aio = @import("aio"); +const coro = @import("coro"); +const vaxis = @import("main.zig"); +const log = std.log.scoped(.vaxis_aio); + +comptime { + if (builtin.target.os.tag == .windows) { + @compileError("Windows is not supported right now"); + } +} + +const Yield = enum { no_state, took_event }; + +/// zig-aio based event loop +/// +pub fn Loop(comptime T: type) type { + return struct { + const Event = T; + + winsize_task: ?coro.Task.Generic2(winsizeTask) = null, + reader_task: ?coro.Task.Generic2(ttyReaderTask) = null, + queue: std.BoundedArray(T, 512) = .{}, + source: aio.EventSource, + fatal: bool = false, + + pub fn init() !@This() { + return .{ .source = try aio.EventSource.init() }; + } + + pub fn deinit(self: *@This(), vx: *vaxis.Vaxis, tty: *vaxis.Tty) void { + vx.deviceStatusReport(tty.anyWriter()) catch {}; + if (self.winsize_task) |task| task.cancel(); + if (self.reader_task) |task| task.cancel(); + self.source.deinit(); + self.* = undefined; + } + + fn winsizeInner(self: *@This(), tty: *vaxis.Tty) !void { + const Context = struct { + loop: *@TypeOf(self.*), + tty: *vaxis.Tty, + winsize: ?vaxis.Winsize = null, + fn cb(ptr: *anyopaque) void { + std.debug.assert(coro.current() == null); + const ctx: *@This() = @ptrCast(@alignCast(ptr)); + ctx.winsize = vaxis.Tty.getWinsize(ctx.tty.fd) catch return; + ctx.loop.source.notify(); + } + }; + + // keep on stack + var ctx: Context = .{ .loop = self, .tty = tty }; + if (@hasField(Event, "winsize")) { + const handler: vaxis.Tty.SignalHandler = .{ .context = &ctx, .callback = Context.cb }; + try vaxis.Tty.notifyWinsize(handler); + } + + while (true) { + try coro.io.single(aio.WaitEventSource{ .source = &self.source }); + if (ctx.winsize) |winsize| { + if (!@hasField(Event, "winsize")) unreachable; + ctx.loop.postEvent(.{ .winsize = winsize }) catch {}; + ctx.winsize = null; + } + } + } + + fn winsizeTask(self: *@This(), tty: *vaxis.Tty) void { + self.winsizeInner(tty) catch |err| { + if (err != error.Canceled) log.err("winsize: {}", .{err}); + self.fatal = true; + }; + } + + fn ttyReaderInner(self: *@This(), vx: *vaxis.Vaxis, tty: *vaxis.Tty, paste_allocator: ?std.mem.Allocator) !void { + // initialize a grapheme cache + var cache: vaxis.GraphemeCache = .{}; + + // get our initial winsize + const winsize = try vaxis.Tty.getWinsize(tty.fd); + if (@hasField(Event, "winsize")) { + try self.postEvent(.{ .winsize = winsize }); + } + + var parser: vaxis.Parser = .{ + .grapheme_data = &vx.unicode.grapheme_data, + }; + + const file: std.fs.File = .{ .handle = tty.fd }; + while (true) { + var buf: [4096]u8 = undefined; + var n: usize = undefined; + var read_start: usize = 0; + try coro.io.single(aio.Read{ .file = file, .buffer = buf[read_start..], .out_read = &n }); + var seq_start: usize = 0; + while (seq_start < n) { + const result = try parser.parse(buf[seq_start..n], paste_allocator); + if (result.n == 0) { + // copy the read to the beginning. We don't use memcpy because + // this could be overlapping, and it's also rare + const initial_start = seq_start; + while (seq_start < n) : (seq_start += 1) { + buf[seq_start - initial_start] = buf[seq_start]; + } + read_start = seq_start - initial_start + 1; + continue; + } + read_start = 0; + seq_start += result.n; + + const event = result.event orelse continue; + switch (event) { + .key_press => |key| { + if (@hasField(Event, "key_press")) { + // HACK: yuck. there has to be a better way + var mut_key = key; + if (key.text) |text| { + mut_key.text = cache.put(text); + } + try self.postEvent(.{ .key_press = mut_key }); + } + }, + .key_release => |*key| { + if (@hasField(Event, "key_release")) { + // HACK: yuck. there has to be a better way + var mut_key = key; + if (key.text) |text| { + mut_key.text = cache.put(text); + } + try self.postEvent(.{ .key_release = mut_key }); + } + }, + .mouse => |mouse| { + if (@hasField(Event, "mouse")) { + try self.postEvent(.{ .mouse = vx.translateMouse(mouse) }); + } + }, + .focus_in => { + if (@hasField(Event, "focus_in")) { + try self.postEvent(.focus_in); + } + }, + .focus_out => { + if (@hasField(Event, "focus_out")) { + try self.postEvent(.focus_out); + } + }, + .paste_start => { + if (@hasField(Event, "paste_start")) { + try self.postEvent(.paste_start); + } + }, + .paste_end => { + if (@hasField(Event, "paste_end")) { + try self.postEvent(.paste_end); + } + }, + .paste => |text| { + if (@hasField(Event, "paste")) { + try self.postEvent(.{ .paste = text }); + } else { + if (paste_allocator) |_| + paste_allocator.?.free(text); + } + }, + .color_report => |report| { + if (@hasField(Event, "color_report")) { + try self.postEvent(.{ .color_report = report }); + } + }, + .color_scheme => |scheme| { + if (@hasField(Event, "color_scheme")) { + try self.postEvent(.{ .color_scheme = scheme }); + } + }, + .cap_kitty_keyboard => { + log.info("kitty keyboard capability detected", .{}); + vx.caps.kitty_keyboard = true; + }, + .cap_kitty_graphics => { + if (!vx.caps.kitty_graphics) { + log.info("kitty graphics capability detected", .{}); + vx.caps.kitty_graphics = true; + } + }, + .cap_rgb => { + log.info("rgb capability detected", .{}); + vx.caps.rgb = true; + }, + .cap_unicode => { + log.info("unicode capability detected", .{}); + vx.caps.unicode = .unicode; + vx.screen.width_method = .unicode; + }, + .cap_sgr_pixels => { + log.info("pixel mouse capability detected", .{}); + vx.caps.sgr_pixels = true; + }, + .cap_color_scheme_updates => { + log.info("color_scheme_updates capability detected", .{}); + vx.caps.color_scheme_updates = true; + }, + .cap_da1 => { + std.Thread.Futex.wake(&vx.query_futex, 10); + }, + .winsize => unreachable, // handled elsewhere for posix + } + } + } + } + + fn ttyReaderTask(self: *@This(), vx: *vaxis.Vaxis, tty: *vaxis.Tty, paste_allocator: ?std.mem.Allocator) void { + self.ttyReaderInner(vx, tty, paste_allocator) catch |err| { + if (err != error.Canceled) log.err("ttyReader: {}", .{err}); + self.fatal = true; + }; + } + + /// Spawns tasks to handle winsize signal and tty + pub fn spawn( + self: *@This(), + scheduler: *coro.Scheduler, + vx: *vaxis.Vaxis, + tty: *vaxis.Tty, + paste_allocator: ?std.mem.Allocator, + spawn_options: coro.Scheduler.SpawnOptions, + ) coro.Scheduler.SpawnError!void { + if (self.reader_task) |_| unreachable; // programming error + // This is required even if app doesn't care about winsize + // It is because it consumes the EventSource, so it can wakeup the scheduler + // Without that custom `postEvent`'s wouldn't wake up the scheduler and UI wouldn't update + self.winsize_task = try scheduler.spawn(winsizeTask, .{ self, tty }, spawn_options); + self.reader_task = try scheduler.spawn(ttyReaderTask, .{ self, vx, tty, paste_allocator }, spawn_options); + } + + pub const PopEventError = error{TtyCommunicationSevered}; + + /// Call this in a while loop in the main event handler until it returns null + pub fn popEvent(self: *@This()) PopEventError!?T { + if (self.fatal) return error.TtyCommunicationSevered; + defer self.winsize_task.?.wakeupIf(Yield.took_event); + defer self.reader_task.?.wakeupIf(Yield.took_event); + return self.queue.popOrNull(); + } + + pub const PostEventError = error{Overflow}; + + pub fn postEvent(self: *@This(), event: T) !void { + if (coro.current()) |_| { + while (true) { + self.queue.insert(0, event) catch { + // wait for the app to take event + try coro.yield(Yield.took_event); + continue; + }; + break; + } + } else { + // queue can be full, app could handle this error by spinning the scheduler + try self.queue.insert(0, event); + } + // wakes up the scheduler, so custom events update UI + self.source.notify(); + } + }; +} diff --git a/src/main.zig b/src/main.zig index 4d8ba4f..d253ba9 100644 --- a/src/main.zig +++ b/src/main.zig @@ -6,6 +6,7 @@ pub const Vaxis = @import("Vaxis.zig"); pub const Loop = @import("Loop.zig").Loop; pub const xev = @import("xev.zig"); +pub const aio = @import("aio.zig"); pub const Queue = @import("queue.zig").Queue; pub const Key = @import("Key.zig");