Phase 1: Permission modal dialog, session resume modal, HistoryInput with up/down arrow cycling, remove "You:" echo from chat log, LLM client cleanup on unmount. Phase 2: Smart shell auto-approve using allowed/denied command lists from ToolsConfig, animated thinking spinner with live token count in status bar. Phase 3: /models slash command (list + switch), CLI directory positional argument, JSONL debug logger with rotation. Phase 4: Skills system with SkillsManager, load_skill LLM tool, /skills listing, skill invocation via slash commands, system prompt integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
222 lines
7.5 KiB
Python
222 lines
7.5 KiB
Python
"""Custom Textual widgets for SneakyCode TUI."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from textual.app import ComposeResult
|
|
from textual.containers import Horizontal, Vertical
|
|
from textual.events import Key
|
|
from textual.screen import ModalScreen
|
|
from textual.timer import Timer
|
|
from textual.widgets import Button, Input, Static
|
|
|
|
from rich.text import Text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modal Dialogs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class PermissionModal(ModalScreen[bool]):
|
|
"""Modal dialog for tool permission approval."""
|
|
|
|
BINDINGS = [("y", "allow", "Allow"), ("n", "deny", "Deny")]
|
|
|
|
def __init__(self, tool_name: str, description: str) -> None:
|
|
super().__init__()
|
|
self._tool_name = tool_name
|
|
self._description = description
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Vertical(id="permission-dialog"):
|
|
yield Static(f"Tool: {self._tool_name}", classes="modal-title")
|
|
yield Static(self._description, classes="modal-body")
|
|
with Horizontal(classes="modal-buttons"):
|
|
yield Button("Allow (y)", id="allow", variant="success")
|
|
yield Button("Deny (n)", id="deny", variant="error")
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
self.dismiss(event.button.id == "allow")
|
|
|
|
def action_allow(self) -> None:
|
|
self.dismiss(True)
|
|
|
|
def action_deny(self) -> None:
|
|
self.dismiss(False)
|
|
|
|
|
|
class SessionResumeModal(ModalScreen[bool]):
|
|
"""Modal dialog for session resume prompt."""
|
|
|
|
BINDINGS = [("y", "resume", "Resume"), ("n", "fresh", "Start Fresh")]
|
|
|
|
def __init__(self, msg_count: int) -> None:
|
|
super().__init__()
|
|
self._msg_count = msg_count
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Vertical(id="permission-dialog"):
|
|
yield Static("Resume Session?", classes="modal-title")
|
|
yield Static(
|
|
f"Found previous session with {self._msg_count} messages.",
|
|
classes="modal-body",
|
|
)
|
|
with Horizontal(classes="modal-buttons"):
|
|
yield Button("Resume (y)", id="resume", variant="success")
|
|
yield Button("Start Fresh (n)", id="fresh", variant="warning")
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
self.dismiss(event.button.id == "resume")
|
|
|
|
def action_resume(self) -> None:
|
|
self.dismiss(True)
|
|
|
|
def action_fresh(self) -> None:
|
|
self.dismiss(False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Input with History
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class HistoryInput(Input):
|
|
"""Input with up/down arrow history cycling."""
|
|
|
|
def __init__(self, **kwargs: object) -> None:
|
|
super().__init__(**kwargs)
|
|
self._history: list[str] = []
|
|
self._history_index: int = -1
|
|
self._draft: str = ""
|
|
|
|
def record(self, value: str) -> None:
|
|
"""Record a submitted value in history."""
|
|
if value.strip() and (not self._history or self._history[-1] != value):
|
|
self._history.append(value)
|
|
self._history_index = -1
|
|
self._draft = ""
|
|
|
|
def on_key(self, event: Key) -> None:
|
|
if event.key == "up" and self._history:
|
|
event.prevent_default()
|
|
if self._history_index == -1:
|
|
self._draft = self.value
|
|
self._history_index = len(self._history) - 1
|
|
elif self._history_index > 0:
|
|
self._history_index -= 1
|
|
self.value = self._history[self._history_index]
|
|
self.cursor_position = len(self.value)
|
|
elif event.key == "down" and self._history_index != -1:
|
|
event.prevent_default()
|
|
self._history_index += 1
|
|
if self._history_index >= len(self._history):
|
|
self._history_index = -1
|
|
self.value = self._draft
|
|
else:
|
|
self.value = self._history[self._history_index]
|
|
self.cursor_position = len(self.value)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status Bar
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class StatusBar(Static):
|
|
"""Single-line status bar showing token usage, iteration count, and streaming spinner."""
|
|
|
|
_SPINNER = "\u280b\u2819\u2839\u2838\u283c\u2834\u2826\u2827\u2807\u280f"
|
|
|
|
DEFAULT_CSS = """
|
|
StatusBar {
|
|
dock: bottom;
|
|
height: 1;
|
|
background: $surface;
|
|
color: $text-muted;
|
|
padding: 0 2;
|
|
}
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__("")
|
|
self._tokens: int = 0
|
|
self._budget: int = 0
|
|
self._iteration: int = 0
|
|
self._max_iterations: int = 0
|
|
self._streaming: bool = False
|
|
self._spinner_frame: int = 0
|
|
self._spinner_timer: Timer | None = None
|
|
self._stream_tokens: int = 0
|
|
|
|
def update_tokens(self, tokens: int, budget: int) -> None:
|
|
"""Update the token usage display."""
|
|
self._tokens = tokens
|
|
self._budget = budget
|
|
self._refresh_display()
|
|
|
|
def update_iteration(self, iteration: int, max_iterations: int) -> None:
|
|
"""Update the iteration count display."""
|
|
self._iteration = iteration
|
|
self._max_iterations = max_iterations
|
|
self._refresh_display()
|
|
|
|
def start_streaming(self) -> None:
|
|
"""Start the animated thinking spinner."""
|
|
self._streaming = True
|
|
self._spinner_frame = 0
|
|
self._stream_tokens = 0
|
|
self._spinner_timer = self.set_interval(0.08, self._tick_spinner)
|
|
self._refresh_display()
|
|
|
|
def stop_streaming(self) -> None:
|
|
"""Stop the animated thinking spinner."""
|
|
self._streaming = False
|
|
if self._spinner_timer:
|
|
self._spinner_timer.stop()
|
|
self._spinner_timer = None
|
|
self._refresh_display()
|
|
|
|
def update_stream_tokens(self, tokens: int) -> None:
|
|
"""Update estimated token count during streaming."""
|
|
self._stream_tokens = tokens
|
|
|
|
def _tick_spinner(self) -> None:
|
|
self._spinner_frame = (self._spinner_frame + 1) % len(self._SPINNER)
|
|
self._refresh_display()
|
|
|
|
def _refresh_display(self) -> None:
|
|
"""Rebuild the status bar text."""
|
|
parts: list[str] = []
|
|
if self._streaming:
|
|
spinner = self._SPINNER[self._spinner_frame]
|
|
parts.append(f"{spinner} Thinking")
|
|
if self._stream_tokens > 0:
|
|
parts.append(f"~{self._stream_tokens:,} tokens")
|
|
if self._budget > 0:
|
|
parts.append(f"Tokens: ~{self._tokens:,} / {self._budget:,}")
|
|
if self._max_iterations > 0:
|
|
parts.append(f"Iteration {self._iteration}/{self._max_iterations}")
|
|
self.update(Text(" \u2502 ".join(parts), style="dim"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming Display
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class StreamingStatic(Static):
|
|
"""A Static widget that stays mounted but hidden during non-streaming periods.
|
|
|
|
During streaming, call show() to make visible and update() with partial content.
|
|
When streaming ends, call hide() to conceal.
|
|
"""
|
|
|
|
def show_streaming(self) -> None:
|
|
"""Make the widget visible for streaming."""
|
|
self.add_class("visible")
|
|
|
|
def hide_streaming(self) -> None:
|
|
"""Hide the widget and clear content."""
|
|
self.remove_class("visible")
|
|
self.update("")
|