terminal/tmux: introduce command queue for viewer

pull/9860/head
Mitchell Hashimoto 2025-12-08 07:09:11 -08:00
parent ec5a60a119
commit 86cd489701
No known key found for this signature in database
GPG Key ID: 523D5DC389D273BC
2 changed files with 156 additions and 43 deletions

View File

@ -3,6 +3,7 @@ const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const testing = std.testing;
const assert = @import("../../quirks.zig").inlineAssert;
const CircBuf = @import("../../datastruct/main.zig").CircBuf;
const control = @import("control.zig");
const output = @import("output.zig");
@ -19,6 +20,12 @@ const log = std.log.scoped(.terminal_tmux_viewer);
// in case something breaks in the future we can consider it. We should
// be able to easily unit test all variations seen in the real world.
/// The initial capacity of the command queue. We dynamically resize
/// as necessary so the initial value isn't that important, but if we
/// want to feel good about it we should make it large enough to support
/// our most realistic use cases without resizing.
const COMMAND_QUEUE_INITIAL = 8;
/// A viewer is a tmux control mode client that attempts to create
/// a remote view of a tmux session, including providing the ability to send
/// new input to the session.
@ -40,6 +47,11 @@ pub const Viewer = struct {
/// The current session ID we're attached to.
session_id: usize,
/// The list of commands we've sent that we want to send and wait
/// for a response for. We only send one command at a time just
/// to avoid any possible confusion around ordering.
command_queue: CommandQueue,
/// The windows in the current session.
windows: std.ArrayList(Window),
@ -52,6 +64,8 @@ pub const Viewer = struct {
/// errors on single-action returns, especially those such as `.exit`.
action_single: [1]Action,
pub const CommandQueue = CircBuf(Command, undefined);
pub const Action = union(enum) {
/// Tmux has closed the control mode connection, we should end
/// our viewer session in some way.
@ -111,7 +125,11 @@ pub const Viewer = struct {
///
/// The given allocator is used for all internal state. You must
/// call deinit when you're done with the viewer to free it.
pub fn init(alloc: Allocator) Viewer {
pub fn init(alloc: Allocator) Allocator.Error!Viewer {
// Create our initial command queue
var command_queue: CommandQueue = try .init(alloc, COMMAND_QUEUE_INITIAL);
errdefer command_queue.deinit(alloc);
return .{
.alloc = alloc,
.state = .startup_block,
@ -119,6 +137,7 @@ pub const Viewer = struct {
// until we receive a session-changed notification which will
// set this to a real value.
.session_id = 0,
.command_queue = command_queue,
.windows = .empty,
.action_arena = .{},
.action_single = undefined,
@ -127,6 +146,11 @@ pub const Viewer = struct {
pub fn deinit(self: *Viewer) void {
self.windows.deinit(self.alloc);
{
var it = self.command_queue.iterator(.forward);
while (it.next()) |command| command.deinit(self.alloc);
self.command_queue.deinit(self.alloc);
}
self.action_arena.promote(self.alloc).deinit();
}
@ -155,11 +179,7 @@ pub const Viewer = struct {
.startup_block => self.nextStartupBlock(n),
.startup_session => self.nextStartupSession(n),
.idle => self.nextIdle(n),
// Once we're in the main states, there's a bunch of shared
// logic so we centralize it.
.list_windows => self.nextCommand(n),
.command_queue => self.nextCommand(n),
};
}
@ -209,11 +229,11 @@ pub const Viewer = struct {
.session_changed => |info| {
self.session_id = info.id;
self.state = .list_windows;
return self.singleAction(.{ .command = std.fmt.comptimePrint(
"list-windows -F '{s}'\n",
.{comptime Format.list_windows.comptimeFormat()},
) });
self.state = .command_queue;
return self.singleAction(self.queueCommand(.list_windows) catch {
log.warn("failed to queue command, becoming defunct", .{});
return self.defunct();
});
},
else => return &.{},
@ -237,39 +257,85 @@ pub const Viewer = struct {
self: *Viewer,
n: control.Notification,
) []const Action {
switch (n) {
.enter => unreachable,
// We have to be in a command queue, but the command queue MAY
// be empty. If it is empty, then receivedCommandOutput will
// handle it by ignoring any command output. That's okay!
assert(self.state == .command_queue);
.exit => return self.defunct(),
return switch (n) {
.enter => unreachable,
.exit => self.defunct(),
inline .block_end,
.block_err,
=> |content, tag| switch (self.state) {
.startup_block,
.startup_session,
.idle,
.defunct,
=> unreachable,
.list_windows => {
// Move to defunct on error blocks.
if (comptime tag == .block_err) return self.defunct();
return self.receivedListWindows(content) catch return self.defunct();
},
=> |content, tag| self.receivedCommandOutput(
content,
tag == .block_err,
) catch err: {
log.warn("failed to process command output, becoming defunct", .{});
break :err self.defunct();
},
// TODO: Use exhaustive matching here, determine if we need
// to handle the other cases.
else => return &.{},
else => &.{},
};
}
fn receivedCommandOutput(
self: *Viewer,
content: []const u8,
is_err: bool,
) ![]const Action {
// If we have no pending commands, this is unexpected.
const command = self.command_queue.first() orelse {
log.info("unexpected block output err={}", .{is_err});
return &.{};
};
self.command_queue.deleteOldest(1);
// We always free any memory associated with the command
defer command.deinit(self.alloc);
// We'll use our arena for the return value here so we can
// easily accumulate actions.
var arena = self.action_arena.promote(self.alloc);
defer self.action_arena = arena.state;
_ = arena.reset(.free_all);
const arena_alloc = arena.allocator();
// Build up our actions to start with the next command if
// we have one.
var actions: std.ArrayList(Action) = .empty;
if (self.command_queue.first()) |next_command| {
try actions.append(
arena_alloc,
.{ .command = next_command.string() },
);
}
// Process our command
switch (command.*) {
.user => {},
.list_windows => try self.receivedListWindows(
arena_alloc,
&actions,
content,
),
}
// Our command processing should not change our state
assert(self.state == .command_queue);
return actions.items;
}
fn receivedListWindows(
self: *Viewer,
arena_alloc: Allocator,
actions: *std.ArrayList(Action),
content: []const u8,
) ![]const Action {
assert(self.state == .list_windows);
) !void {
// This stores our new window state from this list-windows output.
var windows: std.ArrayList(Window) = .empty;
errdefer windows.deinit(self.alloc);
@ -299,14 +365,27 @@ pub const Viewer = struct {
self.windows.deinit(self.alloc);
self.windows = windows;
// Go into the idle state
self.state = .idle;
// TODO: Diff with prior window state, dispatch capture-pane
// requests to collect all of the screen contents, other terminal
// state, etc.
return self.singleAction(.{ .windows = self.windows.items });
try actions.append(arena_alloc, .{ .windows = self.windows.items });
}
/// This queues the command at the end of the command queue
/// and returns an action representing the next command that
/// should be run (the head).
///
/// The next command is not removed, because the expectation is
/// that the head of our command list is always sent to tmux.
fn queueCommand(self: *Viewer, command: Command) Allocator.Error!Action {
// Add our command
try self.command_queue.ensureUnusedCapacity(self.alloc, 1);
self.command_queue.appendAssumeCapacity(command);
// Get our first command to send, guaranteed to exist since we
// just appended one.
return .{ .command = self.command_queue.first().?.string() };
}
/// Helper to return a single action. The input action may use the arena
@ -323,7 +402,7 @@ pub const Viewer = struct {
}
};
const State = union(enum) {
const State = enum {
/// We start in this state just after receiving the initial
/// DCS 1000p opening sequence. We wait for an initial
/// begin/end block that is guaranteed to be sent by tmux for
@ -338,13 +417,46 @@ const State = union(enum) {
/// Tmux has closed the control mode connection
defunct,
/// We're waiting on a list-windows response from tmux. This will
/// be used to resynchronize our entire window state.
/// We're sitting on the command queue waiting for command output
/// in the order provided in the `command_queue` field. This field
/// isn't part of the state because it can be queued at any state.
///
/// Precondition: if self.command_queue.len > 0, then the first
/// command in the queue has already been sent to tmux (via a
/// `command` Action). The next output is assumed to be the result
/// of this command.
///
/// To satisfy the above, any transitions INTO this state should
/// send a command Action for the first command in the queue.
command_queue,
};
const Command = union(enum) {
/// List all windows so we can sync our window state.
list_windows,
/// Idle state, we're not actually doing anything right now except
/// waiting for more events from tmux that may change our behavior.
idle,
/// User command. This is a command provided by the user. Since
/// this is user provided, we can't be sure what it is.
user: []const u8,
pub fn deinit(self: Command, alloc: Allocator) void {
return switch (self) {
.list_windows => {},
.user => |v| alloc.free(v),
};
}
/// Returns the command to execute. The memory of the return
/// value is always safe as long as this command value is alive.
pub fn string(self: Command) []const u8 {
return switch (self) {
.list_windows => std.fmt.comptimePrint(
"list-windows -F '{s}'\n",
.{comptime Format.list_windows.comptimeFormat()},
),
.user => |v| v,
};
}
};
/// Format strings used for commands in our viewer.
@ -379,7 +491,7 @@ const Format = struct {
};
test "immediate exit" {
var viewer = Viewer.init(testing.allocator);
var viewer = try Viewer.init(testing.allocator);
defer viewer.deinit();
const actions = viewer.next(.{ .tmux = .exit });
try testing.expectEqual(1, actions.len);
@ -389,7 +501,7 @@ test "immediate exit" {
}
test "initial flow" {
var viewer = Viewer.init(testing.allocator);
var viewer = try Viewer.init(testing.allocator);
defer viewer.deinit();
// First we receive the initial block end

View File

@ -392,7 +392,8 @@ pub const StreamHandler = struct {
assert(self.tmux_viewer == null);
const viewer = try self.alloc.create(terminal.tmux.Viewer);
errdefer self.alloc.destroy(viewer);
viewer.* = .init(self.alloc);
viewer.* = try .init(self.alloc);
errdefer viewer.deinit();
self.tmux_viewer = viewer;
break :tmux;
},