Files
SneakyCode/app/ui/app.py
Phillip Tarrant 638aecb561 feat: add custom HeaderPanel widget and switchable agent modes
Replace built-in Header with a custom HeaderPanel showing model name,
mode badge, and live token usage. Add AgentMode enum (normal/plan/auto)
with mode-aware permission gating — Plan mode restricts to read-only
tools, Auto mode auto-approves everything. Includes /mode slash command
and Ctrl+P keybinding to cycle modes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:36:23 -05:00

439 lines
18 KiB
Python

"""SneakyCode Textual TUI application."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from rich.markdown import Markdown
from rich.panel import Panel
from rich.text import Text
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.widgets import Input, RichLog
from textual import work
from app.agent.context import SessionContext
from app.agent.loop import AgentLoop
from app.models.config import AgentMode, AppConfig
from app.services.llm import LLMClient
from app.services.permissions import PermissionsService
from app.services.session import SessionManager
from app.services.streaming import StreamHandler
from app.tools.registry import create_default_registry
from app.ui.widgets import (
HeaderPanel,
HistoryInput,
PermissionModal,
SessionResumeModal,
StatusBar,
StreamingStatic,
)
from app.utils.display import DisplayAdapter
from app.utils.logging import get_logger, setup_logging_for_tui
if TYPE_CHECKING:
from textual.worker import Worker
logger = get_logger(__name__)
class SneakyCodeApp(App):
"""Main TUI application for SneakyCode."""
TITLE = "SneakyCode"
CSS_PATH = "styles.tcss"
BINDINGS = [
Binding("ctrl+c", "cancel_or_quit", "Cancel/Quit", show=False),
Binding("ctrl+p", "cycle_mode", "Cycle Mode"),
]
def __init__(self, config: AppConfig, session_mgr: SessionManager | None = None) -> None:
super().__init__()
self._config = config
self._session_mgr = session_mgr
self._ctx: SessionContext | None = None
self._agent: AgentLoop | None = None
self._client: LLMClient | None = None
self._tool_registry = None
self._permissions: PermissionsService | None = None
self._debug_logger = None
self._skills_manager = None
self._skill_runner = None
self._current_worker: Worker | None = None
self._cancel_count = 0
def compose(self) -> ComposeResult:
yield HeaderPanel(model_name=self._config.llm.model)
yield RichLog(id="chat-log", highlight=True, markup=True)
yield StreamingStatic("", id="streaming")
yield StatusBar()
yield HistoryInput(placeholder="Enter your prompt...")
async def on_mount(self) -> None:
"""Initialize agent components after the app is mounted."""
setup_logging_for_tui()
self._ctx = SessionContext(self._config)
# Create long-lived agent dependencies (reused across turns)
self._client = LLMClient(self._config.llm)
await self._client.__aenter__()
self._permissions = PermissionsService(self._config.permissions, self._config.tools)
# Create debug logger if enabled
if self._config.debug.enabled:
from app.services.debug_log import DebugLogger
log_dir = self._config.agent.workspace_root / self._config.debug.log_dir
self._debug_logger = DebugLogger(log_dir, self._config.debug.max_files)
# Initialize skills system
if self._config.skills.enabled:
from app.services.skills import SkillsManager
self._skills_manager = SkillsManager(
self._config.skills, self._config.agent.workspace_root
)
# Create tool registry (SkillRunner wired after registry exists)
self._tool_registry = create_default_registry(
self._config.agent.workspace_root,
self._config,
skills_manager=self._skills_manager,
)
# Create SkillRunner and late-bind it to skill tools
if self._skills_manager is not None and self._tool_registry is not None:
from app.services.skill_runner import SkillRunner
self._skill_runner = SkillRunner(
self._skills_manager,
self._config,
self._ctx,
self._tool_registry,
)
# Late-bind runner to skill tools already in the registry
load_tool = self._tool_registry.get("load_skill")
if load_tool and hasattr(load_tool, "set_skill_runner"):
load_tool.set_skill_runner(self._skill_runner)
finish_tool = self._tool_registry.get("finish_skill")
if finish_tool and hasattr(finish_tool, "set_skill_runner"):
finish_tool.set_skill_runner(self._skill_runner)
# Set up permission prompt callback
async def permission_prompt(tool_name: str, description: str) -> bool:
return await self._show_permission_modal(tool_name, description)
self._permissions.set_prompt_callback(permission_prompt)
# Offer session resume if configured (must run in a worker for push_screen_wait)
self._offer_session_resume()
@work
async def _offer_session_resume(self) -> None:
"""Offer to resume a previous session, running in a worker for modal support."""
if self._session_mgr and self._config.session.offer_resume:
saved = self._session_mgr.load_latest()
if saved:
log = self.query_one("#chat-log", RichLog)
msg_count = len(saved.messages)
resume = await self.push_screen_wait(SessionResumeModal(msg_count))
if resume:
self._session_mgr.restore(saved, self._ctx)
log.write(Text("Session restored", style="bold green"))
else:
log.write(Text("Starting fresh session", style="cyan"))
self.query_one(HistoryInput).focus()
async def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle user input submission."""
user_input = event.value.strip()
if not user_input:
return
event.input.clear()
event.input.record(user_input)
log = self.query_one("#chat-log", RichLog)
# Echo user prompt (condensed for multi-line)
from app.utils.display import render_user_message
log.write(render_user_message(user_input))
# Handle slash commands
if user_input.startswith("/"):
await self._handle_slash_command(user_input, log)
return
# Dispatch agent turn as async worker
self._cancel_count = 0
self._current_worker = self.run_worker(
self._run_agent_turn(user_input),
name="agent-turn",
exclusive=True,
)
async def _handle_slash_command(self, command: str, log: RichLog) -> None:
"""Process slash commands."""
cmd = command.lower()
if cmd == "/help":
from rich.table import Table
table = Table(title="SneakyCode Commands", show_lines=False)
table.add_column("Command", style="cyan", no_wrap=True)
table.add_column("Description")
table.add_row("/help", "Show this help message")
table.add_row("/quit, /exit, /bye", "Save session and exit")
table.add_row("/clear", "Clear conversation history")
table.add_row("/history", "Show conversation history")
table.add_row("/save", "Manually save session")
table.add_row("/session", "Show session info (messages, tokens, start time)")
table.add_row("/models", "List available Ollama models")
table.add_row("/models <name>", "Switch to a different model")
table.add_row("/mode", "Show current agent mode")
table.add_row("/mode normal|plan|auto", "Switch agent mode")
table.add_row("/skills", "List available skills")
table.add_row("/<skill>", "Load a skill by name")
log.write(table)
elif cmd in ("/quit", "/exit", "/bye"):
self._save_session()
self.exit()
elif cmd == "/clear":
if self._ctx:
self._ctx.clear_history()
log.clear()
log.write(Text("✓ Conversation history cleared.", style="bold green"))
elif cmd == "/history":
if self._ctx:
from app.utils.display import render_history
log.write(render_history(self._ctx.get_history()))
elif cmd == "/save":
path = self._save_session()
if path:
log.write(Text(f"✓ Session saved to {path}", style="bold green"))
else:
log.write(Text("✗ No session to save", style="bold red"))
elif cmd == "/session":
if self._ctx:
log.write(Text(
f"Messages: {self._ctx.message_count} | "
f"Tokens: ~{self._ctx.estimated_tokens:,} / {self._ctx.token_counter.budget:,} | "
f"Started: {self._ctx.start_time.isoformat()}",
style="cyan",
))
elif cmd.startswith("/models"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
# List available models
try:
from app.services.llm import LLMError
models = await self._client.list_models()
from rich.table import Table
table = Table(title="Available Models", show_lines=False)
table.add_column("Model", style="cyan")
table.add_column("Size", style="dim")
current = self._config.llm.model
for m in models:
name = m["name"]
marker = " (active)" if current in name or name.startswith(current) else ""
table.add_row(f"{name}{marker}", m["size"])
log.write(table)
except Exception as e:
log.write(Text(f"Failed to list models: {e}", style="red"))
else:
new_model = parts[1].strip()
self._config.llm.model = new_model
self.query_one(HeaderPanel).update_model(new_model)
log.write(Text(f"Switched to model: {new_model}", style="bold green"))
elif cmd.startswith("/mode"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
current = self._permissions.mode
log.write(Text(f"Current mode: {current.value}", style="cyan"))
else:
mode_str = parts[1].strip().lower()
try:
new_mode = AgentMode(mode_str)
except ValueError:
log.write(Text(f"Unknown mode: {mode_str}. Use normal, plan, or auto.", style="yellow"))
return
self._permissions.mode = new_mode
self.query_one(HeaderPanel).update_mode(new_mode)
log.write(Text(f"Switched to {new_mode.value} mode", style="bold green"))
elif cmd == "/skills":
if self._skills_manager:
skills = self._skills_manager.list_skills()
if not skills:
log.write(Text("No skills found", style="yellow"))
else:
from rich.table import Table
table = Table(title="Available Skills")
table.add_column("Name", style="cyan")
table.add_column("Description")
for s in skills:
table.add_row(f"/{s.name}", s.description)
log.write(table)
else:
log.write(Text("Skills system is disabled", style="yellow"))
else:
# Try as skill trigger (package skill via SkillRunner)
if self._skill_runner and self._skills_manager:
skill = self._skills_manager.get_skill_by_trigger(cmd.lstrip("/"))
if skill is not None:
content = self._skill_runner.activate(skill.name)
status_bar = self.query_one(StatusBar)
status_bar.set_active_skill(skill.name)
log.write(Text(f"Skill activated: {skill.name}", style="bold green"))
# Run an agent turn so the LLM sees the skill context
self._cancel_count = 0
self._current_worker = self.run_worker(
self._run_agent_turn(f"[Skill activated: {skill.name}]"),
name="agent-turn",
exclusive=True,
)
return
# Try as legacy skill invocation
skill_name = cmd.lstrip("/")
if self._skills_manager:
content = self._skills_manager.load_skill(skill_name)
if content is not None:
if self._ctx:
self._ctx.add_message("system", f"[Skill: {skill_name}]\n{content}")
log.write(Text(f"Loaded skill: {skill_name}", style="bold green"))
return
log.write(Text(f"Unknown command: {command}", style="yellow"))
async def _run_agent_turn(self, user_input: str) -> None:
"""Run a single agent turn (called as a worker)."""
if self._ctx is None or self._client is None:
return
log = self.query_one("#chat-log", RichLog)
streaming_widget = self.query_one("#streaming", StreamingStatic)
status_bar = self.query_one(StatusBar)
display = DisplayAdapter(log)
# StreamHandler is per-turn (has per-turn accumulators)
handler = StreamHandler(self._config.display)
status_bar.start_streaming()
# Set up streaming UI callbacks
header = self.query_one(HeaderPanel)
def on_content(content: str) -> None:
streaming_widget.update(
Panel(Markdown(content), title="Assistant", border_style="green", expand=True)
)
streaming_widget.show_streaming()
stream_tokens = len(content) // 4
status_bar.update_stream_tokens(stream_tokens)
header.update_tokens(
self._ctx.estimated_tokens + stream_tokens,
self._ctx.token_counter.budget,
)
def on_thinking() -> None:
streaming_widget.update(Text("Thinking...", style="dim"))
streaming_widget.show_streaming()
def on_done() -> None:
streaming_widget.hide_streaming()
status_bar.stop_streaming()
handler.set_callbacks(on_content=on_content, on_thinking=on_thinking, on_done=on_done)
agent = AgentLoop(
self._config, self._ctx, self._client, handler,
self._tool_registry, self._permissions, display,
debug_logger=self._debug_logger,
skills_manager=self._skills_manager,
skill_runner=self._skill_runner,
)
await agent.run_turn(user_input)
status_bar.stop_streaming()
# Update token display in header
header = self.query_one(HeaderPanel)
header.update_tokens(self._ctx.estimated_tokens, self._ctx.token_counter.budget)
# Update skill indicator (skill may have been deactivated via finish_skill)
if self._skill_runner and not self._skill_runner.is_active:
status_bar.set_active_skill(None)
elif self._skill_runner and self._skill_runner.is_active:
status_bar.set_active_skill(self._skill_runner.active_skill_name)
# Auto-save
if self._config.session.auto_save:
self._save_session()
async def _show_permission_modal(self, tool_name: str, description: str) -> bool:
"""Show a modal dialog for tool permission approval."""
return await self.push_screen_wait(PermissionModal(tool_name, description))
def action_cancel_or_quit(self) -> None:
"""Handle Ctrl+C: first press cancels worker, second press quits."""
self._cancel_count += 1
if self._cancel_count >= 2 or self._current_worker is None:
self._save_session()
self.exit()
elif self._current_worker is not None:
self._current_worker.cancel()
log = self.query_one("#chat-log", RichLog)
log.write(Text("⚠ Cancelling... (press Ctrl+C again to quit)", style="yellow"))
def action_cycle_mode(self) -> None:
"""Cycle through agent modes: Normal → Plan → Auto → Normal."""
if self._permissions is None:
return
cycle = {
AgentMode.NORMAL: AgentMode.PLAN,
AgentMode.PLAN: AgentMode.AUTO,
AgentMode.AUTO: AgentMode.NORMAL,
}
new_mode = cycle[self._permissions.mode]
self._permissions.mode = new_mode
self.query_one(HeaderPanel).update_mode(new_mode)
log = self.query_one("#chat-log", RichLog)
log.write(Text(f"Mode: {new_mode.value}", style="bold green"))
async def on_unmount(self) -> None:
"""Clean up the LLM client on app shutdown."""
if self._client is not None:
await self._client.close()
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
"""Handle worker completion or failure."""
from textual.worker import WorkerState
if event.worker.name != "agent-turn":
return
if event.state == WorkerState.ERROR:
log = self.query_one("#chat-log", RichLog)
error = event.worker.error
log.write(Text(f"✗ Agent error: {error}", style="bold red"))
if event.state in (WorkerState.SUCCESS, WorkerState.ERROR, WorkerState.CANCELLED):
self._current_worker = None
# Hide streaming widget and stop spinner in case they were left active
streaming = self.query_one("#streaming", StreamingStatic)
streaming.hide_streaming()
self.query_one(StatusBar).stop_streaming()
def _save_session(self) -> Path | None:
"""Save session quietly, return path or None."""
if self._session_mgr and self._ctx and self._ctx.message_count > 0:
try:
return self._session_mgr.save(self._ctx)
except OSError as e:
logger.warning("session_save_failed", error=str(e))
return None