diff --git a/src/datastruct/blocking_queue.zig b/src/datastruct/blocking_queue.zig index 06bc8267f..c95b6b96a 100644 --- a/src/datastruct/blocking_queue.zig +++ b/src/datastruct/blocking_queue.zig @@ -73,7 +73,7 @@ pub fn BlockingQueue( not_full_waiters: usize = 0, /// Allocate the blocking queue on the heap. - pub fn create(alloc: Allocator) !*Self { + pub fn create(alloc: Allocator) Allocator.Error!*Self { const ptr = try alloc.create(Self); errdefer alloc.destroy(ptr); diff --git a/src/terminal/search/Thread.zig b/src/terminal/search/Thread.zig index b9f98a9dc..38a092a91 100644 --- a/src/terminal/search/Thread.zig +++ b/src/terminal/search/Thread.zig @@ -9,8 +9,13 @@ pub const Thread = @This(); const std = @import("std"); +const builtin = @import("builtin"); +const testing = std.testing; const Allocator = std.mem.Allocator; +const xev = @import("../../global.zig").xev; +const internal_os = @import("../../os/main.zig"); const BlockingQueue = @import("../../datastruct/main.zig").BlockingQueue; +const Terminal = @import("../Terminal.zig"); const log = std.log.scoped(.search_thread); @@ -21,23 +26,57 @@ alloc: std.mem.Allocator, /// this is a blocking queue so if it is full you will get errors (or block). mailbox: *Mailbox, +/// The event loop for the search thread. +loop: xev.Loop, + +/// This can be used to wake up the renderer and force a render safely from +/// any thread. +wakeup: xev.Async, +wakeup_c: xev.Completion = .{}, + +/// This can be used to stop the thread on the next loop iteration. +stop: xev.Async, +stop_c: xev.Completion = .{}, + +/// The options used to initialize this thread. +opts: Options, + /// Initialize the thread. This does not START the thread. This only sets /// up all the internal state necessary prior to starting the thread. It /// is up to the caller to start the thread with the threadMain entrypoint. -pub fn init(alloc: Allocator) Thread { +pub fn init(alloc: Allocator, opts: Options) !Thread { // The mailbox for messaging this thread var mailbox = try Mailbox.create(alloc); errdefer mailbox.destroy(alloc); + // Create our event loop. + var loop = try xev.Loop.init(.{}); + errdefer loop.deinit(); + + // This async handle is used to "wake up" the renderer and force a render. + var wakeup_h = try xev.Async.init(); + errdefer wakeup_h.deinit(); + + // This async handle is used to stop the loop and force the thread to end. + var stop_h = try xev.Async.init(); + errdefer stop_h.deinit(); + return .{ .alloc = alloc, .mailbox = mailbox, + .loop = loop, + .wakeup = wakeup_h, + .stop = stop_h, + .opts = opts, }; } /// Clean up the thread. This is only safe to call once the thread /// completes executing; the caller must join prior to this. pub fn deinit(self: *Thread) void { + self.wakeup.deinit(); + self.stop.deinit(); + self.loop.deinit(); // Nothing can possibly access the mailbox anymore, destroy it. self.mailbox.destroy(self.alloc); } @@ -53,11 +92,110 @@ pub fn threadMain(self: *Thread) void { fn threadMain_(self: *Thread) !void { defer log.debug("search thread exited", .{}); - _ = self; + + // Right now, on Darwin, `std.Thread.setName` can only name the current + // thread, and we have no way to get the current thread from within it, + // so instead we use this code to name the thread instead. + if (comptime builtin.os.tag.isDarwin()) { + internal_os.macos.pthread_setname_np(&"search".*); + + // We can run with lower priority than other threads. + const class: internal_os.macos.QosClass = .utility; + if (internal_os.macos.setQosClass(class)) { + log.debug("thread QoS class set class={}", .{class}); + } else |err| { + log.warn("error setting QoS class err={}", .{err}); + } + } + + // Start the async handlers + self.wakeup.wait(&self.loop, &self.wakeup_c, Thread, self, wakeupCallback); + self.stop.wait(&self.loop, &self.stop_c, Thread, self, stopCallback); + + // Send an initial wakeup so we drain our mailbox immediately. + try self.wakeup.notify(); + + // Run + log.debug("starting search thread", .{}); + defer log.debug("starting search thread shutdown", .{}); + _ = try self.loop.run(.until_done); } +/// Drain the mailbox. +fn drainMailbox(self: *Thread) !void { + while (self.mailbox.pop()) |message| { + log.debug("mailbox message={}", .{message}); + } +} + +fn wakeupCallback( + self_: ?*Thread, + _: *xev.Loop, + _: *xev.Completion, + r: xev.Async.WaitError!void, +) xev.CallbackAction { + _ = r catch |err| { + log.warn("error in wakeup err={}", .{err}); + return .rearm; + }; + + const self = self_.?; + + // When we wake up, we drain the mailbox. Mailbox producers should + // wake up our thread after publishing. + self.drainMailbox() catch |err| + log.warn("error draining mailbox err={}", .{err}); + + return .rearm; +} + +fn stopCallback( + self_: ?*Thread, + _: *xev.Loop, + _: *xev.Completion, + r: xev.Async.WaitError!void, +) xev.CallbackAction { + _ = r catch unreachable; + self_.?.loop.stop(); + return .disarm; +} + +pub const Options = struct { + /// Mutex that must be held while reading/writing the terminal. + mutex: *std.Thread.Mutex, + + /// The terminal data to search. + terminal: *Terminal, +}; + /// The type used for sending messages to the thread. pub const Mailbox = BlockingQueue(Message, 64); /// The messages that can be sent to the thread. -pub const Message = union(enum) {}; +pub const Message = union(enum) { + /// Change the search term. If no prior search term is given this + /// will start a search. If an existing search term is given this will + /// stop the prior search and start a new one. + change_needle: []const u8, +}; + +test { + const alloc = testing.allocator; + var mutex: std.Thread.Mutex = .{}; + var t: Terminal = try .init(alloc, .{ .cols = 10, .rows = 2 }); + defer t.deinit(alloc); + + var thread: Thread = try .init(alloc, .{ + .mutex = &mutex, + .terminal = &t, + }); + defer thread.deinit(); + + var os_thread = try std.Thread.spawn( + .{}, + threadMain, + .{&thread}, + ); + try thread.stop.notify(); + os_thread.join(); +}