Compare commits

..

20 Commits

Author SHA1 Message Date
16d79df421 fix: empty response handling, /no_think model gating, per-model profiles
- Detect empty LLM responses (no content, no tool calls) instead of
  silently treating them as task completion. Retries once without tools
  before warning the user.
- Gate /no_think system message and chat_template_kwargs to Qwen/QwQ
  models only — sending /no_think to llama3.x caused empty responses.
- Add model_profiles config section for per-model overrides (token
  budget, thinking, temperature, max_tokens) matched by name prefix.
  Applied at startup and on /model switch.
- Update SessionManager on /model switch so session files record the
  correct model.
- Add NDJSON fallback in SSE stream parser for Ollama compatibility.
- Improve read_file error to suggest find_files on FileNotFoundError.
- Add diagnostic logging for empty streams and empty results.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 23:09:04 -05:00
1ee721ac10 Merge branch 'bugfix/model-command-regex' 2026-03-11 22:31:50 -05:00
d54a3480b8 fix: /model command no longer incorrectly matched as /mode
Use exact first-word matching instead of startswith() for /models and
/mode command routing. Accept /model as an alias for /models.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 22:31:45 -05:00
d3b286ba40 Merge branch 'feature/file-cache-lru' 2026-03-11 22:26:56 -05:00
d829e6553c feat: add tool-level file cache with LRU eviction and mtime invalidation
Introduces FileCache (OrderedDict LRU, st_mtime_ns validation) to avoid
redundant disk reads and duplicate content in conversation context.
Read tools return a short "[Cached]" message on cache hit instead of
resending unchanged file content, saving tokens. Write/edit/delete tools
invalidate affected paths; str_replace pre-warms the cache after edits.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 22:26:51 -05:00
2c532adbbc Merge branch 'feature/read-many-files-and-fixes' 2026-03-11 21:57:39 -05:00
be1ea81102 fix: preserve streaming UI callbacks across agent loop iterations
StreamHandler.reset() was clearing on_content, on_thinking, and on_done
callbacks after every LLM response, but they were only set once per turn.
This caused the thinking indicator and streaming display to stop working
after the first tool call in a multi-step agent turn.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:59 -05:00
3fe0f7af47 style: change header background from orange to dark cyan
Improves readability of the lightning bolt emoji and gives the header
a distinctive look.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:57 -05:00
05754fe06b feat: add read_many_files tool for batch file reading
Reduces LLM round-trips by allowing multiple files to be read in a
single tool call. Uses best-effort error handling so partial failures
still return successful reads.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:53 -05:00
0886727437 Merge branch 'feature/header-panel-agent-modes' 2026-03-11 21:36:27 -05:00
638aecb561 feat: add custom HeaderPanel widget and switchable agent modes
Replace built-in Header with a custom HeaderPanel showing model name,
mode badge, and live token usage. Add AgentMode enum (normal/plan/auto)
with mode-aware permission gating — Plan mode restricts to read-only
tools, Auto mode auto-approves everything. Includes /mode slash command
and Ctrl+P keybinding to cycle modes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:36:23 -05:00
b878408f3e Merge branch 'fix/remove-cat-allowed-command' 2026-03-11 21:15:05 -05:00
5b5c3098bb fix: remove cat from allowed shell commands and fix stale display tests
Remove cat from allowed_commands to close shell redirect bypass vector
(read_file provides safer alternative). Update display tests to match
render_user_message returning Text instead of Panel, and shell test to
use head instead of cat.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:14:21 -05:00
4e3da84578 Merge branch 'fix/display-bug-and-shell-write-bypass' 2026-03-11 20:58:55 -05:00
2ad3df521d fix: display assistant content with finish tool call + block shell write redirects
Bug 1: Assistant text content was silently dropped when the LLM response
included both content and tool calls (e.g. finish with a summary). Now
content is displayed before tool call execution regardless.

Bug 2: Shell redirect operators (>, >>, <<) allowed bypassing file-write
permissions when the base command (e.g. cat) was in the allowed list.
Redirects now require explicit user approval in permissions, and the
shell tool itself blocks them as defense-in-depth.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:58:07 -05:00
4496fce354 docs: update README with full config reference, remove completed roadmap
Expands README with complete config.yaml reference, CLI options table,
skills documentation, and updated command list. Removes the old roadmap
(all phases complete). Updates tweaks.md with current design notes.
Adds .sneakycode/ to .gitignore and includes superpowers design specs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:05:16 -05:00
133bcbda57 feat: re-echo condensed user prompt in chat log
Shows user input as a compact one-liner ("You: prompt text") instead of
a full panel. Multi-line input is collapsed to the first line with a
"(+N lines)" suffix, and long lines are truncated at 120 chars.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:05:10 -05:00
7705008b9c fix: prevent /no_think tag from leaking into user message content
The /no_think directive was appended directly to the user's message text,
causing the LLM to interpret it as user input (e.g., referencing it as a
directory path). Now injected as a separate system message after the last
user message. Also adds which, jq, type, and file to shell allowed_commands
(were mistakenly placed in permissions.auto_approve).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:04:47 -05:00
9273d14845 Merge branch 'feature/disable-thinking-mode' 2026-03-11 19:36:49 -05:00
f0d8ef8f0a feat: add thinking mode toggle to suppress reasoning-only response loops
Adds `llm.thinking` config option (default: true) that when disabled:
- Injects /no_think into the last user message for Qwen 3.x compatibility
- Sends chat_template_kwargs in API payload for backends that support it
- Silently and immediately nudges on reasoning-only responses instead of
  showing warnings and wasting retry iterations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 19:34:36 -05:00
28 changed files with 3323 additions and 240 deletions

3
.gitignore vendored
View File

@@ -34,3 +34,6 @@ htmlcov/
# Worktrees
.worktrees/
# SneakyCode local data
.sneakycode/

View File

@@ -26,14 +26,19 @@ pip install -e ".[dev]"
## Configuration
Edit `config/config.yaml` to configure the agent. Key settings:
Edit `config/config.yaml` to configure the agent. The full configuration reference:
```yaml
llm:
model: "qwen3.5:latest" # Ollama model name
endpoint: "http://localhost:11434" # Ollama endpoint
api_path: "/v1/chat/completions" # API endpoint path
temperature: 0.1 # Sampling temperature
max_tokens: 4096 # Maximum tokens in LLM response
timeout: 120 # Request timeout in seconds
max_retries: 3 # Retry attempts on transient errors
retry_backoff_base: 1.0 # Exponential backoff base (seconds)
retry_backoff_max: 30.0 # Maximum backoff seconds
agent:
max_iterations: 25 # Max tool-call iterations per turn
@@ -43,6 +48,7 @@ agent:
truncation_threshold: 0.85 # Budget fraction that triggers truncation
session:
session_dir: ".sneakycode/sessions" # Directory for session files
auto_save: true # Save session after each turn
max_session_age_hours: 72 # Auto-cleanup old sessions
offer_resume: true # Offer to resume on startup
@@ -51,6 +57,48 @@ permissions:
auto_approve: [read_file, list_dir, grep_files, find_files, finish]
prompt_user: [write_file, delete_file, run_command, str_replace, patch_apply, make_dir]
deny: []
tools:
shell:
allowed_commands: # Commands the LLM may run
- git
- python
- pip
- pytest
- ruff
- ls
- cat
- head
- tail
- wc
- diff
- grep
- find
- echo
denied_commands: # Blocked commands
- rm -rf /
- sudo
- curl
- wget
max_output_bytes: 65536 # Max captured output size (bytes)
filesystem:
max_file_size_bytes: 1048576 # 1 MB — max file size for read/write
binary_detection: true # Detect and reject binary files
display:
show_tool_calls: true # Show tool call details in output
show_token_usage: true # Show token usage stats
stream_output: true # Stream LLM output to terminal
skills:
enabled: true # Enable the skills system
directories: # Directories to scan for skill files
- ".sneakycode/skills"
debug:
enabled: false # Enable debug logging
log_dir: ".sneakycode/logs" # Debug log directory
max_files: 10 # Max debug log files to retain
```
Environment variable `SNEAKYCODE_CONFIG` can override the config file path.
@@ -58,9 +106,12 @@ Environment variable `SNEAKYCODE_CONFIG` can override the config file path.
## Usage
```bash
# Start the interactive REPL
# Start the interactive TUI
sneakycode
# Open a specific project directory
sneakycode /path/to/project
# Or run directly
python -m app.main
@@ -68,25 +119,38 @@ python -m app.main
sneakycode --config path/to/config.yaml --verbose --log-file sneakycode.log
```
### CLI Options
| Option | Description |
|--------------------------|--------------------------------------------------|
| `DIRECTORY` | Project directory to use as workspace root |
| `--config PATH` | Path to config YAML file (default: `config/config.yaml`) |
| `-v`, `--verbose` | Enable verbose (DEBUG) logging |
| `--log-file PATH` | Path to log file for persistent logging |
### REPL Commands
| Command | Description |
|------------|--------------------------------------|
| `/quit` | Save session and exit |
|-------------------|----------------------------------------------------|
| `/help` | Show available commands |
| `/quit` | Save session and exit (also `/exit`, `/bye`) |
| `/history` | Show conversation history |
| `/clear` | Clear conversation history |
| `/save` | Manually save session |
| `/session` | Show session info (messages, tokens) |
| `/session` | Show session info (messages, tokens, start time) |
| `/models` | List available Ollama models |
| `/models <name>` | Switch to a different model |
| `/skills` | List available skills |
### Session Persistence
Sessions are automatically saved after each agent turn and on exit. On startup, SneakyCode offers to resume the most recent session for the current workspace.
Session files are stored in `.sneakycode/sessions/` within the workspace root.
Session files are stored in `.sneakycode/sessions/` within the workspace root (configurable via `session.session_dir`).
## Available Tools
SneakyCode provides 11 tools across 5 categories. See [docs/tools.md](docs/tools.md) for the full reference.
SneakyCode provides tools across 6 categories. See [docs/tools.md](docs/tools.md) for the full reference.
| Category | Tools | Permission |
|------------|-------------------------------------------------|---------------|
@@ -96,6 +160,17 @@ SneakyCode provides 11 tools across 5 categories. See [docs/tools.md](docs/tools
| Edit | `str_replace`, `patch_apply` | User confirm |
| Shell | `run_command` | User confirm |
| Control | `finish` | Auto-approved |
| Skills | `load_skill` | Auto-approved |
The `load_skill` tool is available when `skills.enabled` is `true` in the config. It allows the LLM to load skill instructions from the configured skill directories.
## Skills
SneakyCode includes a skills system that lets you provide reusable instruction sets to the LLM. Skills are markdown files placed in `.sneakycode/skills/` (or any directory listed in `skills.directories`).
Skills are auto-discovered on startup. The LLM can load them via the `load_skill` tool, and you can list available skills with the `/skills` command.
To create a skill, add a `.md` file to your skills directory with a descriptive filename (e.g., `refactoring.md`). The file content is injected into the conversation when the skill is loaded.
## Development
@@ -121,6 +196,7 @@ app/
├── models/ # Pydantic config and message schemas
├── services/ # LLM client, streaming, permissions, session persistence
├── tools/ # Tool implementations (one file per group)
├── ui/ # Textual TUI application and widgets
└── utils/ # Logging, display, file helpers, token counter
config/
└── config.yaml # Application configuration

View File

@@ -7,7 +7,7 @@ import time
from typing import TYPE_CHECKING, Any
from app.agent.context import SessionContext
from app.models.config import AppConfig
from app.models.config import AgentMode, AppConfig
from app.models.message import Message
from app.models.tool_call import ToolCall, ToolResult, ToolResultStatus
from app.services.llm import LLMClient, LLMConnectionError, LLMError, LLMStreamError
@@ -59,6 +59,12 @@ class AgentLoop:
self._skills = skills_manager
self._skill_runner = skill_runner
self._tools_schema = registry.get_openai_tools_schema()
if self._permissions.mode == AgentMode.PLAN:
read_only = PermissionsService.READ_ONLY_TOOLS
self._tools_schema = [
t for t in self._tools_schema
if t["function"]["name"] in read_only
]
self._system_prompt = self._build_system_prompt()
self._cancelled = False
@@ -89,12 +95,47 @@ class AgentLoop:
f"\n\nCurrently active skill: {self._skill_runner.active_skill_name}. "
"When the skill's objective is complete, call the `finish_skill` tool."
)
if self._permissions.mode == AgentMode.PLAN:
prompt += (
"\n\nYou are in PLAN mode. You may only use read-only tools: "
"read_file, list_dir, grep_files, find_files, finish. "
"Do NOT attempt to write files, edit code, or run commands. "
"Instead, describe what changes you would make, which files "
"you would modify, and provide the reasoning for each change."
)
return prompt
# Models whose chat templates understand /no_think directives.
_THINKING_MODEL_PREFIXES = ("qwen", "qwq")
def _model_supports_no_think(self) -> bool:
"""Check if the current model uses a thinking chat template."""
model_lower = self._config.llm.model.lower()
return any(model_lower.startswith(p) for p in self._THINKING_MODEL_PREFIXES)
def _get_messages_with_system_prompt(self) -> list[Message]:
"""Prepend the system prompt to conversation history."""
"""Prepend the system prompt to conversation history.
When thinking is disabled on a model that supports it, appends a
system-level /no_think directive after the last user message so
Qwen 3.x (and similar) chat templates see it.
"""
system_msg = Message(role="system", content=self._system_prompt)
return [system_msg] + self._ctx.get_history()
history = self._ctx.get_history()
if not self._config.llm.thinking and self._model_supports_no_think() and history:
history = list(history)
# Find last user message and insert a system hint after it
for i in range(len(history) - 1, -1, -1):
if history[i].role == "user":
no_think_msg = Message(
role="system",
content="/no_think",
)
history.insert(i + 1, no_think_msg)
break
return [system_msg] + history
async def run_turn(self, user_input: str) -> None:
"""Execute one full agent turn: add user message, loop until done.
@@ -107,6 +148,7 @@ class AgentLoop:
max_iter = self._config.agent.max_iterations
reasoning_only_streak = 0
empty_streak = 0
for iteration in range(1, max_iter + 1):
if self._cancelled:
if self._display:
@@ -161,6 +203,10 @@ class AgentLoop:
reasoning_only_streak += 1
self._ctx.pop_last_message()
# When thinking is disabled, reasoning-only is expected model noise.
# Nudge immediately and silently to avoid wasting iterations.
thinking_disabled = not self._config.llm.thinking
# If the last context messages are tool errors, nudge immediately
# rather than wasting retries — the model is likely confused by the error.
has_recent_tool_error = any(
@@ -168,9 +214,14 @@ class AgentLoop:
for m in self._ctx.get_history()[-3:]
)
if has_recent_tool_error or reasoning_only_streak >= _MAX_REASONING_RETRIES:
# Nudge the model by injecting a user hint
if self._display:
should_nudge = (
thinking_disabled
or has_recent_tool_error
or reasoning_only_streak >= _MAX_REASONING_RETRIES
)
if should_nudge:
if not thinking_disabled and self._display:
self._display.write_warning(
f"Model produced reasoning but no response {reasoning_only_streak} times. "
"Nudging model to respond..."
@@ -188,10 +239,42 @@ class AgentLoop:
# Successful response — reset streak
reasoning_only_streak = 0
# No tool calls → task complete (plain text response)
if not assistant_msg.tool_calls:
# Detect completely empty response (no content, no tool calls)
if not assistant_msg.content and not assistant_msg.tool_calls:
empty_streak += 1
self._ctx.pop_last_message() # Don't keep empty messages
if empty_streak >= 2:
if self._display:
self._display.write_warning(
"Model returned repeated empty responses — "
"try a different model or check Ollama logs."
)
break
if self._display:
self._display.write_warning("Model returned empty response. Retrying without tools...")
# Retry without tool schemas — some models return empty when
# tools are in the payload but the model can't handle them.
assistant_msg = await self._llm_step(skip_tools=True)
if assistant_msg is None:
break
if assistant_msg.content:
self._ctx.add_message("assistant", assistant_msg.content)
if self._display:
self._display.write_assistant_message(assistant_msg.content)
self._handler.reset()
break
# Still empty even without tools
self._handler.reset()
continue
empty_streak = 0 # reset on successful non-empty response
# Display any assistant text content (even if tool calls follow)
if self._display and assistant_msg.content:
self._display.write_assistant_message(assistant_msg.content)
# No tool calls → task complete (plain text response)
if not assistant_msg.tool_calls:
break
# Execute tool calls
@@ -219,21 +302,25 @@ class AgentLoop:
if self._display:
self._display.write_warning(f"Agent reached maximum iterations ({max_iter}). Stopping.")
async def _llm_step(self) -> Message | None:
async def _llm_step(self, *, skip_tools: bool = False) -> Message | None:
"""Stream one LLM response and return the accumulated Message.
Uses retry-enabled streaming. On mid-stream errors, attempts to recover
partial content if available.
Args:
skip_tools: If True, send the request without tool schemas (fallback mode).
Returns:
The assistant Message, or None if an error occurred.
"""
messages = self._get_messages_with_system_prompt()
if self._debug:
self._debug.log_request(messages, self._config.llm.model)
tools = None if skip_tools else self._tools_schema
t0 = time.monotonic()
try:
chunk_iter = self._client.stream_chat_with_retry(messages, tools=self._tools_schema)
chunk_iter = self._client.stream_chat_with_retry(messages, tools=tools)
result = await self._handler.process_stream(chunk_iter)
if result and self._debug:
elapsed = (time.monotonic() - t0) * 1000

View File

@@ -1,6 +1,7 @@
"""Pydantic configuration models mapping to config/config.yaml."""
import os
from enum import StrEnum
from pathlib import Path
from typing import Any
@@ -8,6 +9,31 @@ import yaml
from pydantic import BaseModel, Field, model_validator
class AgentMode(StrEnum):
"""Runtime agent mode controlling permission behavior."""
NORMAL = "normal"
PLAN = "plan"
AUTO = "auto"
class ModelProfile(BaseModel):
"""Per-model overrides applied when switching models."""
max_conversation_tokens: int | None = Field(
default=None, description="Token budget override for this model's context window"
)
thinking: bool | None = Field(
default=None, description="Override thinking mode for this model"
)
temperature: float | None = Field(
default=None, description="Override sampling temperature"
)
max_tokens: int | None = Field(
default=None, description="Override max response tokens"
)
class LLMConfig(BaseModel):
"""LLM backend configuration."""
@@ -20,6 +46,10 @@ class LLMConfig(BaseModel):
max_retries: int = Field(default=3, description="Max retry attempts on transient errors")
retry_backoff_base: float = Field(default=1.0, description="Base seconds for exponential backoff")
retry_backoff_max: float = Field(default=30.0, description="Maximum backoff seconds")
thinking: bool = Field(
default=True,
description="Enable model thinking/reasoning mode (disable to reduce reasoning-only loops)",
)
extra_body: dict[str, Any] = Field(
default_factory=dict,
description="Extra parameters merged into the API request body (model-specific)",
@@ -60,11 +90,19 @@ class ShellToolConfig(BaseModel):
max_output_bytes: int = Field(default=65536, description="Max output capture size in bytes")
class FileCacheConfig(BaseModel):
"""File cache configuration."""
enabled: bool = Field(default=True, description="Enable file content caching")
max_entries: int = Field(default=128, description="Maximum cached file entries (LRU eviction)")
class FilesystemToolConfig(BaseModel):
"""Filesystem tool limits."""
max_file_size_bytes: int = Field(default=1_048_576, description="Max file size for read/write")
binary_detection: bool = Field(default=True, description="Detect and reject binary files")
cache: FileCacheConfig = Field(default_factory=FileCacheConfig, description="File cache settings")
class ToolsConfig(BaseModel):
@@ -124,6 +162,10 @@ class AppConfig(BaseModel):
session: SessionConfig = Field(default_factory=SessionConfig)
debug: DebugConfig = Field(default_factory=DebugConfig)
skills: SkillsConfig = Field(default_factory=SkillsConfig)
model_profiles: dict[str, ModelProfile] = Field(
default_factory=dict,
description="Per-model overrides keyed by model name prefix",
)
@model_validator(mode="after")
def resolve_workspace_root(self) -> "AppConfig":
@@ -131,6 +173,39 @@ class AppConfig(BaseModel):
self.agent.workspace_root = self.agent.workspace_root.resolve()
return self
def get_model_profile(self, model: str) -> ModelProfile | None:
"""Find the best matching model profile by prefix.
Matches the longest prefix first (e.g., "llama3.1" beats "llama3"
for model "llama3.1:latest"). Returns None if no profile matches.
"""
model_lower = model.lower().split(":")[0] # strip tag
best_match: str | None = None
for key in self.model_profiles:
key_lower = key.lower()
if model_lower == key_lower or model_lower.startswith(key_lower):
if best_match is None or len(key) > len(best_match):
best_match = key
return self.model_profiles.get(best_match) if best_match else None
def apply_model_profile(self, model: str) -> ModelProfile | None:
"""Apply the matching model profile overrides to the active config.
Returns the applied profile, or None if no profile matched.
"""
profile = self.get_model_profile(model)
if profile is None:
return None
if profile.max_conversation_tokens is not None:
self.agent.max_conversation_tokens = profile.max_conversation_tokens
if profile.thinking is not None:
self.llm.thinking = profile.thinking
if profile.temperature is not None:
self.llm.temperature = profile.temperature
if profile.max_tokens is not None:
self.llm.max_tokens = profile.max_tokens
return profile
# Default config file location relative to project root
_DEFAULT_CONFIG_PATH = Path("config/config.yaml")

View File

@@ -151,7 +151,12 @@ class LLMClient:
if tools:
payload["tools"] = tools
# Merge model-specific extra parameters (e.g., enable_thinking, reasoning_effort)
# When thinking is disabled, inject chat_template_kwargs for backends
# that support it (Qwen 3.x thinking models).
if not self._config.thinking and self._config.model.lower().startswith(("qwen", "qwq")):
payload.setdefault("chat_template_kwargs", {})["enable_thinking"] = False
# Merge model-specific extra parameters (e.g., reasoning_effort)
if self._config.extra_body:
payload.update(self._config.extra_body)
@@ -166,20 +171,32 @@ class LLMClient:
status_code=response.status_code,
)
chunk_count = 0
async for line in response.aiter_lines():
if not line.startswith("data: "):
line = line.strip()
if not line:
continue
data = line[6:] # strip "data: " prefix
# SSE format: "data: {json}" or "data: [DONE]"
if line.startswith("data: "):
data = line[6:]
if data.strip() == "[DONE]":
return
break
elif line.startswith("{"):
# Plain NDJSON fallback (some Ollama versions)
data = line
else:
continue
try:
yield json.loads(data)
chunk_count += 1
except json.JSONDecodeError:
logger.warning("malformed_sse_chunk", data=data[:200])
if chunk_count == 0:
logger.warning("empty_stream", model=self._config.model)
except httpx.ConnectError as e:
raise LLMConnectionError(f"Cannot connect to LLM endpoint: {e}") from e
except httpx.TimeoutException as e:

View File

@@ -4,16 +4,20 @@ from __future__ import annotations
import json
import logging
import re
import shlex
from collections.abc import Awaitable, Callable
from app.models.config import PermissionsConfig, ToolsConfig
from app.models.config import AgentMode, PermissionsConfig, ToolsConfig
logger = logging.getLogger(__name__)
# Type alias for the async prompt callback
PromptCallback = Callable[[str, str], Awaitable[bool]]
# Detect shell redirects that write to files (>, >>, heredocs)
_WRITE_REDIRECT_PATTERN = re.compile(r"(?:>\s*\S|>>|<<)")
class PermissionDenied(Exception):
"""Raised when a tool is denied execution by permissions policy."""
@@ -26,6 +30,10 @@ class PermissionsService:
shows a modal dialog. Without a callback, unlisted tools are denied.
"""
READ_ONLY_TOOLS: frozenset[str] = frozenset({
"read_file", "list_dir", "grep_files", "find_files", "finish",
})
def __init__(
self,
config: PermissionsConfig,
@@ -34,6 +42,16 @@ class PermissionsService:
self.config = config
self._tools_config = tools_config
self._prompt_callback: PromptCallback | None = None
self._mode: AgentMode = AgentMode.NORMAL
@property
def mode(self) -> AgentMode:
"""Current agent mode."""
return self._mode
@mode.setter
def mode(self, value: AgentMode) -> None:
self._mode = value
def set_prompt_callback(self, callback: PromptCallback) -> None:
"""Set the async callback used to prompt the user for permission.
@@ -63,6 +81,16 @@ class PermissionsService:
logger.info("Tool '%s' is in deny list — blocked", tool_name)
return False
if self._mode == AgentMode.AUTO:
logger.debug("Tool '%s' auto-approved (AUTO mode)", tool_name)
return True
if self._mode == AgentMode.PLAN:
if tool_name not in self.READ_ONLY_TOOLS:
logger.info("Tool '%s' blocked in Plan mode (read-only tools only)", tool_name)
return False
return True
if tool_name in self.config.auto_approve:
logger.debug("Tool '%s' is auto-approved", tool_name)
return True
@@ -104,6 +132,11 @@ class PermissionsService:
logger.info("Shell command '%s' matches denied prefix '%s'", cmd, denied)
return False
# Detect shell redirects that write to files — require approval
if _WRITE_REDIRECT_PATTERN.search(cmd):
logger.info("Shell command '%s' contains file-write redirect — requiring approval", cmd)
return None # fall through to user prompt
# Allowed commands: base executable match
if shell_config.allowed_commands:
if base_cmd in shell_config.allowed_commands:

View File

@@ -52,6 +52,10 @@ class SessionManager:
self._session_dir = workspace_root / config.session_dir
self._session_id = f"{self._workspace_hash}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
def update_model(self, model: str) -> None:
"""Update the model name for session metadata."""
self._model = model
def save(self, ctx: "SessionContext") -> Path:
"""Save session state to a JSON file via atomic write.

View File

@@ -60,8 +60,10 @@ class StreamHandler:
"""
thinking_notified = False
last_update_time = 0.0
chunk_count = 0
async for chunk in chunk_iter:
chunk_count += 1
self._process_chunk(chunk)
if not self._display_config.stream_output:
@@ -96,6 +98,14 @@ class StreamHandler:
self._on_done()
tool_calls = self._build_tool_calls() or None
if chunk_count > 0 and not self._accumulated_content and not tool_calls:
logger.debug(
"stream_empty_result",
chunks_received=chunk_count,
had_reasoning=bool(self._accumulated_reasoning),
)
return Message(
role="assistant",
content=self._accumulated_content or None,
@@ -183,11 +193,8 @@ class StreamHandler:
return bool(self._accumulated_reasoning) and not self._accumulated_content and not self._tool_calls
def reset(self) -> None:
"""Clear all accumulators for the next turn."""
"""Clear accumulators for the next LLM call, preserving UI callbacks."""
self._accumulated_content = ""
self._accumulated_reasoning = ""
self._tool_calls.clear()
self._usage = None
self._on_content = None
self._on_thinking = None
self._on_done = None

View File

@@ -1,13 +1,17 @@
"""Edit tools: str_replace and patch_apply."""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from app.models.config import AppConfig
from app.models.tool_call import ToolResult, ToolResultStatus
from app.tools.base import BaseTool
from app.utils.file_cache import FileCache, cached_read_file
from app.utils.file_helpers import (
FileSizeError,
PathSecurityError,
@@ -37,6 +41,12 @@ class StrReplaceTool(BaseTool):
)
params_model = StrReplaceParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(
self, *, tool_call_id: str, file_path: str, old_str: str, new_str: str, **kwargs: Any
) -> ToolResult:
@@ -44,11 +54,12 @@ class StrReplaceTool(BaseTool):
# Read the file
try:
content = safe_read_file(
content = cached_read_file(
file_path,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
cache=self._file_cache,
)
except PathSecurityError as exc:
return ToolResult(
@@ -117,8 +128,14 @@ class StrReplaceTool(BaseTool):
safe_path = resolve_safe_path(file_path, self.workspace_root)
rel_path = safe_path.relative_to(self.workspace_root)
except (PathSecurityError, ValueError):
safe_path = None
rel_path = Path(file_path)
# Pre-warm cache with the new content (we already have it in memory).
if self._file_cache is not None and safe_path is not None:
self._file_cache.invalidate(safe_path)
self._file_cache.put(safe_path, new_content)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
@@ -144,6 +161,12 @@ class PatchApplyTool(BaseTool):
)
params_model = PatchApplyParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, patch: str, **kwargs: Any) -> ToolResult:
try:
safe_path = resolve_safe_path(file_path, self.workspace_root)
@@ -195,6 +218,9 @@ class PatchApplyTool(BaseTool):
error=f"Patch failed (exit {result.returncode}): {result.stderr or result.stdout}",
)
if self._file_cache is not None:
self._file_cache.invalidate(safe_path)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:

View File

@@ -1,12 +1,16 @@
"""Filesystem tools: read_file, list_dir, write_file, make_dir, delete_file."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from app.models.config import AppConfig
from app.models.tool_call import ToolResult, ToolResultStatus
from app.tools.base import BaseTool
from app.utils.file_cache import FileCache, cached_read_file
from app.utils.file_helpers import (
BinaryFileError,
FileSizeError,
@@ -23,6 +27,12 @@ class ReadFileParams(BaseModel):
file_path: str = Field(description="Path to the file to read (relative to workspace root)")
class ReadManyFilesParams(BaseModel):
"""Parameters for the read_many_files tool."""
file_paths: list[str] = Field(description="List of file paths to read (relative to workspace root)")
class ReadFileTool(BaseTool):
"""Read the contents of a file within the workspace."""
@@ -30,14 +40,22 @@ class ReadFileTool(BaseTool):
description = "Read the full contents of a text file. Returns the file content as a string."
params_model = ReadFileParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
fs_config = self.config.tools.filesystem
hits_before = self._file_cache.stats.hits if self._file_cache else 0
try:
content = safe_read_file(
content = cached_read_file(
file_path,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
cache=self._file_cache,
)
except PathSecurityError as exc:
return ToolResult(
@@ -47,11 +65,12 @@ class ReadFileTool(BaseTool):
error=str(exc),
)
except FileNotFoundError as exc:
filename = Path(file_path).name
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
error=f"{exc}. Use find_files to locate it, e.g. find_files(pattern=\"{filename}\")",
)
except FileSizeError as exc:
return ToolResult(
@@ -68,6 +87,23 @@ class ReadFileTool(BaseTool):
error=str(exc),
)
# On cache hit the file is unchanged — its content is already in
# conversation context from the earlier read, so avoid resending it.
was_cache_hit = (
self._file_cache is not None
and self._file_cache.stats.hits > hits_before
)
if was_cache_hit:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=(
f"[Cached] {file_path} is unchanged since last read "
f"({len(content):,} chars). Content is already in conversation context."
),
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
@@ -76,6 +112,76 @@ class ReadFileTool(BaseTool):
)
class ReadManyFilesTool(BaseTool):
"""Read contents of multiple files at once."""
name = "read_many_files"
description = (
"Read contents of multiple files at once. Returns each file's content "
"prefixed with its path header."
)
params_model = ReadManyFilesParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_paths: list[str], **kwargs: Any) -> ToolResult:
if not file_paths:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="file_paths list is empty",
)
fs_config = self.config.tools.filesystem
sections: list[str] = []
success_count = 0
for fp in file_paths:
hits_before = self._file_cache.stats.hits if self._file_cache else 0
try:
content = cached_read_file(
fp,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
cache=self._file_cache,
)
was_hit = (
self._file_cache is not None
and self._file_cache.stats.hits > hits_before
)
if was_hit:
sections.append(
f"=== {fp} ===\n[Cached] Unchanged since last read "
f"({len(content):,} chars). Already in conversation context."
)
else:
sections.append(f"=== {fp} ===\n{content}")
success_count += 1
except (PathSecurityError, FileNotFoundError, FileSizeError, BinaryFileError) as exc:
sections.append(f"=== {fp} ===\n[ERROR] {exc}")
if success_count == 0:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="All files failed to read:\n" + "\n".join(sections),
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output="\n".join(sections),
)
class ListDirParams(BaseModel):
"""Parameters for the list_dir tool."""
@@ -167,6 +273,12 @@ class WriteFileTool(BaseTool):
)
params_model = WriteFileParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, content: str, **kwargs: Any) -> ToolResult:
fs_config = self.config.tools.filesystem
try:
@@ -191,6 +303,9 @@ class WriteFileTool(BaseTool):
error=str(exc),
)
if self._file_cache is not None:
self._file_cache.invalidate(safe_path)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:
@@ -272,6 +387,12 @@ class DeleteFileTool(BaseTool):
description = "Delete a single file. Does not delete directories."
params_model = DeleteFileParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
try:
safe_path = resolve_safe_path(file_path, self.workspace_root)
@@ -309,6 +430,9 @@ class DeleteFileTool(BaseTool):
error=f"Failed to delete file: {exc}",
)
if self._file_cache is not None:
self._file_cache.invalidate(safe_path)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:

View File

@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any
from app.models.config import AppConfig
from app.tools.base import BaseTool
from app.utils.file_cache import FileCache
if TYPE_CHECKING:
from app.services.skills import SkillsManager
@@ -89,6 +90,7 @@ def create_default_registry(
config: AppConfig,
skills_manager: SkillsManager | None = None,
skill_runner: object | None = None,
file_cache: FileCache | None = None,
) -> ToolRegistry:
"""Create a ToolRegistry populated with all built-in tools.
@@ -97,9 +99,10 @@ def create_default_registry(
config: Application configuration.
skills_manager: Optional skills manager for skill tools.
skill_runner: Optional SkillRunner for package skill activation.
file_cache: Optional file cache shared across file-reading tools.
"""
# Read tools
from app.tools.filesystem import ListDirTool, ReadFileTool
from app.tools.filesystem import ListDirTool, ReadFileTool, ReadManyFilesTool
# Write tools
from app.tools.filesystem import DeleteFileTool, MakeDirTool, WriteFileTool
@@ -119,7 +122,8 @@ def create_default_registry(
registry = ToolRegistry()
# Read
registry.register(ReadFileTool(workspace_root, config))
registry.register(ReadFileTool(workspace_root, config, file_cache=file_cache))
registry.register(ReadManyFilesTool(workspace_root, config, file_cache=file_cache))
registry.register(ListDirTool(workspace_root, config))
# Search
@@ -127,13 +131,13 @@ def create_default_registry(
registry.register(FindFilesTool(workspace_root, config))
# Write
registry.register(WriteFileTool(workspace_root, config))
registry.register(WriteFileTool(workspace_root, config, file_cache=file_cache))
registry.register(MakeDirTool(workspace_root, config))
registry.register(DeleteFileTool(workspace_root, config))
registry.register(DeleteFileTool(workspace_root, config, file_cache=file_cache))
# Edit
registry.register(StrReplaceTool(workspace_root, config))
registry.register(PatchApplyTool(workspace_root, config))
registry.register(StrReplaceTool(workspace_root, config, file_cache=file_cache))
registry.register(PatchApplyTool(workspace_root, config, file_cache=file_cache))
# Shell
registry.register(RunCommandTool(workspace_root, config))

View File

@@ -1,5 +1,6 @@
"""Shell tool: run_command."""
import re
import shlex
import subprocess
from typing import Any
@@ -11,6 +12,9 @@ from app.tools.base import BaseTool
_DEFAULT_TIMEOUT = 30
# Detect shell redirects that write to files (>, >>, heredocs)
_WRITE_REDIRECT_PATTERN = re.compile(r"(?:>\s*\S|>>|<<)")
class RunCommandParams(BaseModel):
"""Parameters for the run_command tool."""
@@ -43,6 +47,18 @@ class RunCommandTool(BaseTool):
error=f"Command denied: matches blocked prefix '{denied}'",
)
# Defense-in-depth: flag file-write redirects in tool result
if _WRITE_REDIRECT_PATTERN.search(command):
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=(
f"Command contains file-write redirect (>, >>, or <<) "
f"which bypasses file-write permissions. Use write_file instead."
),
)
# Allow check: first token must be in allowed_commands
try:
tokens = shlex.split(command)

View File

@@ -10,18 +10,19 @@ from rich.panel import Panel
from rich.text import Text
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.widgets import Header, Input, RichLog
from textual.widgets import Input, RichLog
from textual import work
from app.agent.context import SessionContext
from app.agent.loop import AgentLoop
from app.models.config import AppConfig
from app.models.config import AgentMode, AppConfig
from app.services.llm import LLMClient
from app.services.permissions import PermissionsService
from app.services.session import SessionManager
from app.services.streaming import StreamHandler
from app.tools.registry import create_default_registry
from app.ui.widgets import (
HeaderPanel,
HistoryInput,
PermissionModal,
SessionResumeModal,
@@ -45,6 +46,7 @@ class SneakyCodeApp(App):
BINDINGS = [
Binding("ctrl+c", "cancel_or_quit", "Cancel/Quit", show=False),
Binding("ctrl+p", "cycle_mode", "Cycle Mode"),
]
def __init__(self, config: AppConfig, session_mgr: SessionManager | None = None) -> None:
@@ -61,10 +63,9 @@ class SneakyCodeApp(App):
self._skill_runner = None
self._current_worker: Worker | None = None
self._cancel_count = 0
self.sub_title = config.llm.model
def compose(self) -> ComposeResult:
yield Header()
yield HeaderPanel(model_name=self._config.llm.model)
yield RichLog(id="chat-log", highlight=True, markup=True)
yield StreamingStatic("", id="streaming")
yield StatusBar()
@@ -74,6 +75,9 @@ class SneakyCodeApp(App):
"""Initialize agent components after the app is mounted."""
setup_logging_for_tui()
# Apply model profile for the initial model before creating context
self._config.apply_model_profile(self._config.llm.model)
self._ctx = SessionContext(self._config)
# Create long-lived agent dependencies (reused across turns)
@@ -96,11 +100,20 @@ class SneakyCodeApp(App):
self._config.skills, self._config.agent.workspace_root
)
# Create file cache if enabled
self._file_cache = None
fs_cache_cfg = self._config.tools.filesystem.cache
if fs_cache_cfg.enabled:
from app.utils.file_cache import FileCache
self._file_cache = FileCache(max_entries=fs_cache_cfg.max_entries)
# Create tool registry (SkillRunner wired after registry exists)
self._tool_registry = create_default_registry(
self._config.agent.workspace_root,
self._config,
skills_manager=self._skills_manager,
file_cache=self._file_cache,
)
# Create SkillRunner and late-bind it to skill tools
@@ -156,6 +169,10 @@ class SneakyCodeApp(App):
event.input.record(user_input)
log = self.query_one("#chat-log", RichLog)
# Echo user prompt (condensed for multi-line)
from app.utils.display import render_user_message
log.write(render_user_message(user_input))
# Handle slash commands
if user_input.startswith("/"):
await self._handle_slash_command(user_input, log)
@@ -184,8 +201,10 @@ class SneakyCodeApp(App):
table.add_row("/history", "Show conversation history")
table.add_row("/save", "Manually save session")
table.add_row("/session", "Show session info (messages, tokens, start time)")
table.add_row("/models", "List available Ollama models")
table.add_row("/models <name>", "Switch to a different model")
table.add_row("/models, /model", "List available Ollama models")
table.add_row("/model <name>", "Switch to a different model")
table.add_row("/mode", "Show current agent mode")
table.add_row("/mode normal|plan|auto", "Switch agent mode")
table.add_row("/skills", "List available skills")
table.add_row("/<skill>", "Load a skill by name")
log.write(table)
@@ -215,7 +234,7 @@ class SneakyCodeApp(App):
f"Started: {self._ctx.start_time.isoformat()}",
style="cyan",
))
elif cmd.startswith("/models"):
elif cmd.split()[0] in ("/models", "/model"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
# List available models
@@ -239,8 +258,44 @@ class SneakyCodeApp(App):
else:
new_model = parts[1].strip()
self._config.llm.model = new_model
self.sub_title = new_model
log.write(Text(f"Switched to model: {new_model}", style="bold green"))
if self._session_mgr:
self._session_mgr.update_model(new_model)
# Apply model-specific profile overrides
profile = self._config.apply_model_profile(new_model)
if profile and self._ctx:
# Update token budget if the profile overrides it
self._ctx.token_counter.budget = self._config.agent.max_conversation_tokens
self.query_one(HeaderPanel).update_model(new_model)
header = self.query_one(HeaderPanel)
header.update_tokens(
self._ctx.estimated_tokens if self._ctx else 0,
self._config.agent.max_conversation_tokens,
)
msg = f"Switched to model: {new_model}"
if profile:
overrides = []
if profile.max_conversation_tokens is not None:
overrides.append(f"tokens={profile.max_conversation_tokens:,}")
if profile.thinking is not None:
overrides.append(f"thinking={'on' if profile.thinking else 'off'}")
if overrides:
msg += f" ({', '.join(overrides)})"
log.write(Text(msg, style="bold green"))
elif cmd.split()[0] == "/mode":
parts = command.split(maxsplit=1)
if len(parts) == 1:
current = self._permissions.mode
log.write(Text(f"Current mode: {current.value}", style="cyan"))
else:
mode_str = parts[1].strip().lower()
try:
new_mode = AgentMode(mode_str)
except ValueError:
log.write(Text(f"Unknown mode: {mode_str}. Use normal, plan, or auto.", style="yellow"))
return
self._permissions.mode = new_mode
self.query_one(HeaderPanel).update_mode(new_mode)
log.write(Text(f"Switched to {new_mode.value} mode", style="bold green"))
elif cmd == "/skills":
if self._skills_manager:
skills = self._skills_manager.list_skills()
@@ -302,12 +357,19 @@ class SneakyCodeApp(App):
status_bar.start_streaming()
# Set up streaming UI callbacks
header = self.query_one(HeaderPanel)
def on_content(content: str) -> None:
streaming_widget.update(
Panel(Markdown(content), title="Assistant", border_style="green", expand=True)
)
streaming_widget.show_streaming()
status_bar.update_stream_tokens(len(content) // 4)
stream_tokens = len(content) // 4
status_bar.update_stream_tokens(stream_tokens)
header.update_tokens(
self._ctx.estimated_tokens + stream_tokens,
self._ctx.token_counter.budget,
)
def on_thinking() -> None:
streaming_widget.update(Text("Thinking...", style="dim"))
@@ -331,6 +393,10 @@ class SneakyCodeApp(App):
status_bar.stop_streaming()
# Update token display in header
header = self.query_one(HeaderPanel)
header.update_tokens(self._ctx.estimated_tokens, self._ctx.token_counter.budget)
# Update skill indicator (skill may have been deactivated via finish_skill)
if self._skill_runner and not self._skill_runner.is_active:
status_bar.set_active_skill(None)
@@ -356,6 +422,21 @@ class SneakyCodeApp(App):
log = self.query_one("#chat-log", RichLog)
log.write(Text("⚠ Cancelling... (press Ctrl+C again to quit)", style="yellow"))
def action_cycle_mode(self) -> None:
"""Cycle through agent modes: Normal → Plan → Auto → Normal."""
if self._permissions is None:
return
cycle = {
AgentMode.NORMAL: AgentMode.PLAN,
AgentMode.PLAN: AgentMode.AUTO,
AgentMode.AUTO: AgentMode.NORMAL,
}
new_mode = cycle[self._permissions.mode]
self._permissions.mode = new_mode
self.query_one(HeaderPanel).update_mode(new_mode)
log = self.query_one("#chat-log", RichLog)
log.write(Text(f"Mode: {new_mode.value}", style="bold green"))
async def on_unmount(self) -> None:
"""Clean up the LLM client on app shutdown."""
if self._client is not None:

View File

@@ -55,8 +55,8 @@ Screen {
Input {
dock: bottom;
margin: 0;
border: none;
padding: 0 1;
}
Header {
dock: top;
}
/* HeaderPanel styles are in DEFAULT_CSS on the widget itself */

View File

@@ -11,6 +11,82 @@ from textual.widgets import Button, Input, Static
from rich.text import Text
from app.models.config import AgentMode
# ---------------------------------------------------------------------------
# Header Panel
# ---------------------------------------------------------------------------
class HeaderPanel(Static):
"""Single-line header showing model name, agent mode, and token usage."""
DEFAULT_CSS = """
HeaderPanel {
dock: top;
height: 1;
background: darkcyan;
color: $text;
padding: 0 2;
}
"""
def __init__(self, model_name: str) -> None:
super().__init__("")
self._model_name = model_name
self._mode: AgentMode = AgentMode.NORMAL
self._tokens: int = 0
self._budget: int = 0
def on_resize(self) -> None:
self._refresh_display()
def update_model(self, name: str) -> None:
"""Update the displayed model name."""
self._model_name = name
self._refresh_display()
def update_mode(self, mode: AgentMode) -> None:
"""Update the displayed agent mode."""
self._mode = mode
self._refresh_display()
def update_tokens(self, tokens: int, budget: int) -> None:
"""Update the token usage display."""
self._tokens = tokens
self._budget = budget
self._refresh_display()
def _refresh_display(self) -> None:
"""Rebuild the header text."""
left = Text.assemble(
("⚡ SneakyCode", "bold"),
"",
(self._model_name, "bold"),
)
mode_styles = {
AgentMode.NORMAL: ("NORMAL", "bold black on white"),
AgentMode.PLAN: ("PLAN", "bold black on yellow"),
AgentMode.AUTO: ("AUTO", "bold white on red"),
}
mode_label, mode_style = mode_styles[self._mode]
mode_text = Text.assemble((" ", mode_style), (mode_label, mode_style), (" ", mode_style))
right = Text(f"~{self._tokens:,} / {self._budget:,} tokens")
# Pad between sections
total_content = left.plain + " " + mode_text.plain + " " + right.plain
available = self.size.width if self.size.width > 0 else 80
gap_left = max(1, (available - len(total_content)) // 2)
gap_right = max(1, available - len(total_content) - gap_left)
full = Text.assemble(
left, " " * gap_left, mode_text, " " * gap_right, right,
)
self.update(full)
# ---------------------------------------------------------------------------
# Modal Dialogs
@@ -139,8 +215,6 @@ class StatusBar(Static):
def __init__(self) -> None:
super().__init__("")
self._tokens: int = 0
self._budget: int = 0
self._iteration: int = 0
self._max_iterations: int = 0
self._streaming: bool = False
@@ -149,12 +223,6 @@ class StatusBar(Static):
self._stream_tokens: int = 0
self._active_skill: str | None = None
def update_tokens(self, tokens: int, budget: int) -> None:
"""Update the token usage display."""
self._tokens = tokens
self._budget = budget
self._refresh_display()
def update_iteration(self, iteration: int, max_iterations: int) -> None:
"""Update the iteration count display."""
self._iteration = iteration
@@ -200,8 +268,6 @@ class StatusBar(Static):
parts.append(f"{spinner} Thinking")
if self._stream_tokens > 0:
parts.append(f"~{self._stream_tokens:,} tokens")
if self._budget > 0:
parts.append(f"Tokens: ~{self._tokens:,} / {self._budget:,}")
if self._max_iterations > 0:
parts.append(f"Iteration {self._iteration}/{self._max_iterations}")
self.update(Text(" \u2502 ".join(parts), style="dim"))

View File

@@ -44,9 +44,22 @@ if TYPE_CHECKING:
# ---------------------------------------------------------------------------
def render_user_message(content: str) -> Panel:
"""Render a user message as a styled panel."""
return Panel(content, title="You", border_style="cyan", expand=False)
def render_user_message(content: str) -> Text:
"""Render a condensed user prompt as a single styled line.
Multi-line input is collapsed to the first line with a line count suffix.
Long single lines are truncated.
"""
lines = content.splitlines()
first = lines[0] if lines else content
max_len = 120
if len(first) > max_len:
first = first[:max_len] + ""
suffix = f" (+{len(lines) - 1} lines)" if len(lines) > 1 else ""
text = Text()
text.append("You: ", style="bold cyan")
text.append(first + suffix, style="cyan")
return text
def render_assistant_message(content: str) -> Panel:
@@ -223,8 +236,8 @@ def print_success(message: str) -> None:
def print_user_message(content: str) -> None:
"""Print a user message in a styled panel."""
console.print(Panel(content, title="You", border_style="cyan", expand=False))
"""Print a condensed user prompt line."""
console.print(render_user_message(content))
def print_assistant_message(content: str) -> None:

185
app/utils/file_cache.py Normal file
View File

@@ -0,0 +1,185 @@
"""File cache with LRU eviction and mtime-based invalidation."""
from __future__ import annotations
from collections import OrderedDict
from dataclasses import dataclass, field
from pathlib import Path
from app.utils.file_helpers import (
BinaryFileError,
FileSizeError,
PathSecurityError,
check_file_size,
is_binary_file,
resolve_safe_path,
)
@dataclass(slots=True)
class CacheEntry:
"""A cached file's content and modification timestamp."""
content: str
mtime_ns: int
@dataclass
class CacheStats:
"""Running statistics for a FileCache instance."""
hits: int = 0
misses: int = 0
invalidations: int = 0
evictions: int = 0
@property
def hit_rate(self) -> float:
"""Return cache hit rate as a float between 0.0 and 1.0."""
total = self.hits + self.misses
if total == 0:
return 0.0
return self.hits / total
class FileCache:
"""LRU file-content cache with mtime-based invalidation.
Keyed by resolved absolute ``Path``. Each lookup performs a cheap
``stat()`` syscall to verify the file hasn't changed on disk — if the
nanosecond mtime differs the entry is evicted and the caller gets a
cache miss.
Not thread-safe (single-threaded agent loop).
"""
def __init__(self, max_entries: int = 128) -> None:
self._max_entries = max_entries
self._entries: OrderedDict[Path, CacheEntry] = OrderedDict()
self._stats = CacheStats()
# -- public API --------------------------------------------------
def get(self, path: Path) -> str | None:
"""Return cached content if *path* hasn't changed, else ``None``.
A ``stat()`` call checks ``st_mtime_ns``; on mismatch the stale
entry is silently removed.
"""
entry = self._entries.get(path)
if entry is None:
self._stats.misses += 1
return None
try:
current_mtime_ns = path.stat().st_mtime_ns
except OSError:
# File gone — evict and miss.
self._remove(path)
self._stats.misses += 1
return None
if current_mtime_ns != entry.mtime_ns:
self._remove(path)
self._stats.invalidations += 1
self._stats.misses += 1
return None
# Cache hit — move to end (most-recently used).
self._entries.move_to_end(path)
self._stats.hits += 1
return entry.content
def put(self, path: Path, content: str) -> None:
"""Store *content* for *path* with its current ``st_mtime_ns``.
Evicts the least-recently-used entry when over capacity.
"""
try:
mtime_ns = path.stat().st_mtime_ns
except OSError:
# Can't stat — don't cache.
return
if path in self._entries:
# Update existing; move to end.
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
self._entries.move_to_end(path)
else:
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
# Evict LRU if over capacity.
while len(self._entries) > self._max_entries:
self._entries.popitem(last=False)
self._stats.evictions += 1
def invalidate(self, path: Path) -> None:
"""Remove *path* from the cache if present."""
if path in self._entries:
del self._entries[path]
self._stats.invalidations += 1
def clear(self) -> None:
"""Remove all entries."""
self._entries.clear()
@property
def stats(self) -> CacheStats:
"""Return the running cache statistics."""
return self._stats
def __len__(self) -> int:
return len(self._entries)
# -- internals ---------------------------------------------------
def _remove(self, path: Path) -> None:
"""Delete an entry without bumping invalidation stats."""
self._entries.pop(path, None)
def cached_read_file(
path: str | Path,
workspace_root: Path,
max_size_bytes: int = 1_048_576,
check_binary: bool = True,
cache: FileCache | None = None,
) -> str:
"""Read a file with full security checks, using *cache* when available.
Security checks (path sandboxing, size limit, binary detection) run on
**every** call — only the ``Path.read_text()`` I/O is skipped on a cache
hit.
When *cache* is ``None`` this behaves identically to
:func:`~app.utils.file_helpers.safe_read_file`.
Raises:
PathSecurityError: If the path escapes the workspace.
FileSizeError: If the file is too large.
BinaryFileError: If the file is binary and *check_binary* is True.
FileNotFoundError: If the file does not exist.
"""
safe_path = resolve_safe_path(path, workspace_root)
if not safe_path.exists():
raise FileNotFoundError(f"File not found: {safe_path}")
check_file_size(safe_path, max_size_bytes)
if check_binary and is_binary_file(safe_path):
raise BinaryFileError(f"File appears to be binary: {safe_path}")
# Try cache.
if cache is not None:
cached = cache.get(safe_path)
if cached is not None:
return cached
# Cache miss (or no cache) — read from disk.
content = safe_path.read_text(encoding="utf-8")
if cache is not None:
cache.put(safe_path, content)
return content

View File

@@ -36,6 +36,11 @@ class TokenCounter:
"""The configured token budget."""
return self._budget
@budget.setter
def budget(self, value: int) -> None:
"""Update the token budget (e.g., when switching models)."""
self._budget = value
@property
def cumulative_usage(self) -> TokenUsage:
"""Cumulative token usage across all tracked calls."""

View File

@@ -10,21 +10,32 @@ llm:
max_retries: 3
retry_backoff_base: 1.0
retry_backoff_max: 30.0
thinking: false # Disable model thinking/reasoning mode (reduces reasoning-only loops)
# Extra parameters merged into the API request body (model-specific).
# Examples:
# Qwen 3.x: enable_thinking: false
# DeepSeek: enable_thinking: false
# OpenAI: reasoning_effort: "low"
extra_body:
enable_thinking: false
extra_body: {}
agent:
max_iterations: 25
max_conversation_tokens: 32000
max_conversation_tokens: 32000 # Default token budget (overridden by model_profiles)
workspace_root: "."
truncation_keep_recent: 10
truncation_threshold: 0.85
# Per-model overrides — matched by longest model name prefix.
# Unset fields fall through to the defaults above.
model_profiles:
llama3:
max_conversation_tokens: 120000
thinking: false
qwen:
max_conversation_tokens: 32000
thinking: false
qwq:
max_conversation_tokens: 32000
thinking: true
permissions:
auto_approve:
- read_file
@@ -50,7 +61,6 @@ tools:
- pytest
- ruff
- ls
- cat
- head
- tail
- wc
@@ -58,6 +68,10 @@ tools:
- grep
- find
- echo
- which
- jq
- type
- file
denied_commands:
- rm -rf /
- sudo
@@ -67,6 +81,9 @@ tools:
filesystem:
max_file_size_bytes: 1048576 # 1 MB
binary_detection: true
cache:
enabled: true
max_entries: 128
session:
session_dir: ".sneakycode/sessions"

View File

@@ -1,144 +0,0 @@
# SneakyCode Implementation Roadmap
A phased plan progressing from bare-bones foundation to full autonomous coding agent.
---
## Phase 1 — Foundation: Models, Config, and Utilities
Establish the data layer and shared infrastructure everything else builds on.
| File | Description |
|------|-------------|
| `app/models/config.py` | Pydantic v2 config model — load and validate `config/config.yaml` |
| `app/models/message.py` | Message schema (role, content, tool_calls) |
| `app/models/tool_call.py` | ToolCall and ToolResult schemas |
| `app/utils/logging.py` | Centralized logger with Rich handler |
| `app/utils/display.py` | Rich console output helpers (stub — expanded in Phase 2) |
| `app/utils/file_helpers.py` | Safe path resolution, binary detection, size guards |
| `app/utils/token_counter.py` | Approximate token usage tracking (character-based heuristic for v1) |
| `app/main.py` | Entrypoint stub — arg parsing, config load, Rich console setup |
**Exit criteria:** `python -m app.main --help` runs, config loads and validates, models can be instantiated and serialized.
---
## Phase 2 — TUI and Interactive Shell
Get a working interactive terminal before wiring up the LLM.
| File | Description |
|------|-------------|
| `app/main.py` | Rich-based interactive REPL loop — prompt for user input, display responses |
| `app/utils/display.py` | Formatted output for agent messages, tool calls, errors, token usage |
| `app/agent/context.py` | Session state and conversation history management |
**Exit criteria:** User can type messages into a styled REPL, see them echoed back with formatting, and conversation history is tracked in memory.
---
## Phase 3 — LLM Integration (Ollama)
Connect to the local LLM and stream responses into the TUI.
| File | Description |
|------|-------------|
| `app/services/llm.py` | Async httpx client wrapping Ollama's OpenAI-compatible `/v1/chat/completions` endpoint |
| `app/services/streaming.py` | SSE parsing, Rich live display, tool call extraction from accumulated stream |
**Integration:** Wire LLM into the REPL — user message goes to LLM, streamed response displays in real time.
**Exit criteria:** User can chat with the local model through the TUI with streamed output. Tool call JSON is parsed from the stream but not yet executed.
---
## Phase 4 — Tool Framework and Core Tools
Build the tool abstraction and implement safe, read-only tools first.
| File | Description |
|------|-------------|
| `app/tools/base.py` | `BaseTool` ABC and `ToolResult` dataclass |
| `app/tools/registry.py` | Tool registration, discovery, and JSON schema export for LLM system prompt |
| `app/services/permissions.py` | Two-tier approval gating (auto-approve reads; prompt for writes/deletes/shell) |
| `app/tools/filesystem.py` | `read_file`, `list_dir` |
| `app/tools/search.py` | `grep_files`, `find_files` |
**Exit criteria:** Tools register themselves, schemas export correctly for inclusion in the system prompt, read-only tools execute and return `ToolResult` objects. Permissions service gates execution.
---
## Phase 5 — Agent Loop (ReAct)
The core autonomy layer — reason, act, observe, repeat.
| File | Description |
|------|-------------|
| `app/agent/loop.py` | ReAct cycle: send conversation to LLM, parse tool calls, execute, feed results back, repeat |
**Key behaviors:**
- System prompt constructed with tool schemas from registry
- Permissions checks before each tool execution
- Loop termination on: plain-text response (no tool calls), explicit `finish` tool call, or `max_iterations` exceeded
**Exit criteria:** Agent can autonomously answer questions about the codebase by chaining `read_file`, `list_dir`, `grep_files`, and `find_files` tool calls in a multi-turn loop.
---
## Phase 6 — Write Tools and Shell
Unlock the agent's ability to modify code and run commands.
| File | Description |
|------|-------------|
| `app/tools/filesystem.py` | `write_file`, `make_dir`, `delete_file` (additions to existing module) |
| `app/tools/edit.py` | `str_replace` (unique-match required), `patch_apply` |
| `app/tools/shell.py` | `run_command` with command allow/deny lists and output truncation |
**All write/shell operations gated through permissions service.**
**Exit criteria:** Agent can autonomously create files, edit code via string replacement, and run shell commands — all with user approval for destructive operations.
---
## Phase 7 — Polish and Hardening
Production-readiness: error handling, resource limits, and documentation.
| Area | Description |
|------|-------------|
| Error handling | Recovery from malformed tool calls, LLM errors, network timeouts in agent loop |
| Token budget | Conversation truncation or summarization when approaching context limit |
| Graceful shutdown | Clean Ctrl+C handling, session state preservation |
| Testing | End-to-end integration tests (`tests/integration/`), unit tests (`tests/unit/`) |
| Documentation | `README.md` with setup and usage instructions, `docs/tools.md` tool reference |
**Exit criteria:** Agent handles edge cases gracefully, tests pass, and a new user can set up and use the project from the README alone.
---
## File Coverage
Every file from the project structure in CLAUDE.md is accounted for:
| File | Phase |
|------|-------|
| `app/main.py` | 1, 2 |
| `app/models/config.py` | 1 |
| `app/models/message.py` | 1 |
| `app/models/tool_call.py` | 1 |
| `app/utils/logging.py` | 1 |
| `app/utils/display.py` | 1, 2 |
| `app/utils/file_helpers.py` | 1 |
| `app/utils/token_counter.py` | 1 |
| `app/agent/context.py` | 2 |
| `app/services/llm.py` | 3 |
| `app/services/streaming.py` | 3 |
| `app/tools/base.py` | 4 |
| `app/tools/registry.py` | 4 |
| `app/services/permissions.py` | 4 |
| `app/tools/filesystem.py` | 4, 6 |
| `app/tools/search.py` | 4 |
| `app/agent/loop.py` | 5 |
| `app/tools/edit.py` | 6 |
| `app/tools/shell.py` | 6 |

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,192 @@
# Textual TUI Redesign — Design Spec
## Overview
Replace the current sequential print-and-scroll terminal UI with a full persistent split-screen TUI using Textual. Input is pinned at the bottom, scrollable message history above, with a header showing app/model info and a footer showing token usage and iteration count.
## Layout
```
+------------------- Header --------------------+
| SneakyCode qwen2.5-coder:32b |
+-----------------------------------------------+
| |
| +--- You ---+ |
| | prompt | <- RichLog widget |
| +-----------+ (handles own scrolling) |
| |
| Thinking... |
| |
| +-- Assistant --+ |
| | response... | |
| +---------------+ |
| |
| > read_file README.md -- 148 lines, 5128 ch |
| > grep_files "pattern" -- 3 matches |
| |
+-----------------------------------------------+
| Tokens: ~1,511 / 32,000 | Iteration 5/25 | <- StatusBar
+-----------------------------------------------+
| > [input cursor] | <- Input widget
+-----------------------------------------------+
```
**Widget hierarchy (no VerticalScroll wrapper — RichLog handles its own scrolling):**
- `Header` — Textual built-in, title="SneakyCode", subtitle=model name
- `RichLog` (id="chat-log") — main scroll area, accepts Rich renderables via `.write()`
- `StreamingStatic` — persistent hidden `Static` widget, shown/hidden during streaming (avoids mount/unmount overhead)
- `StatusBar` — custom `Static` widget, 1 row, docked above Input
- `Input` — Textual built-in, pinned at bottom
## New Files
### `app/ui/app.py` — Textual App
SneakyCodeApp subclasses `textual.app.App`. Responsibilities:
- `compose()` yields: Header, RichLog(id="chat-log"), StreamingStatic(id="streaming"), StatusBar(id="status"), Input
- `on_input_submitted()` handler: reads input value, clears input, writes user panel to chat log, dispatches agent turn as a worker
- Agent turn runs via `run_worker()` (async worker, NOT threaded) so the UI stays responsive. Since the worker is async and on the event loop, widget methods can be called directly — no `call_from_thread()` needed.
- Slash commands (/quit, /history, /clear, /save, /session) parsed from input before dispatching to agent
- Holds references to config, SessionContext, AgentLoop (created in `on_mount`)
- Header subtitle set to model name from config
- `on_worker_state_changed()` handler: catches worker errors and writes error panels to RichLog
- Ctrl+C binding: cancels the running agent worker (does NOT quit the app). A second Ctrl+C or `/quit` exits.
### `app/ui/widgets.py` — Custom Widgets
**StatusBar** — A simple `Static` widget styled as a footer bar. Displays token usage and iteration count. Updated by the agent loop after each LLM step via `status_bar.update(renderable)`.
**StreamingStatic** — A `Static` widget that stays mounted but hidden. During streaming, it becomes visible and receives `update()` calls with partial content. When streaming ends, it is hidden and its content is cleared. This avoids the overhead of mounting/unmounting on every LLM response.
### `app/ui/styles.tcss` — Textual CSS
Layout rules:
- RichLog fills available height (fraction-based sizing, e.g. `height: 1fr`)
- StreamingStatic: `display: none` by default, shown during streaming
- StatusBar is 1 row, docked bottom above Input
- Input is 1 row, docked at very bottom
- Color scheme matches existing SNEAKYCODE_THEME (cyan for user, green for assistant, magenta for tools, dim for metadata)
## Modified Files
### `app/main.py`
- Remove `_run_repl()` async function entirely
- Remove `console.input()` usage
- `main()` creates config, runs preflight via `asyncio.run(_preflight(config))` (before Textual starts — this is fine, separate event loop), then instantiates and runs `SneakyCodeApp(config).run()`
- CLI arg parsing stays (--config, -v, --log-file)
- Session resume: `_offer_session_resume()` moves into `SneakyCodeApp.on_mount()` — instead of `console.input()`, push a modal screen asking "Resume previous session? [y/n]" with button/key handlers
- Auto-save: triggers after each agent turn completes (in the worker completion handler)
- SIGTERM handler: removed — Textual manages its own signal handling and shutdown lifecycle
### `app/services/streaming.py`
- Remove `from rich.live import Live` and `from rich.spinner import Spinner`
- `process_stream()` no longer creates a `Rich.Live` context
- Instead, accepts callback parameters:
- `on_content: Callable[[str], None]` — called with accumulated content on each content chunk
- `on_thinking: Callable[[], None]` — called once when first reasoning token arrives
- `on_done: Callable[[], None]` — called when streaming completes
- **Throttling:** Content callback fires at most every 100ms (track last update time, skip intermediate chunks). Final content always fires on stream end.
- Since the agent runs as an async worker (on the event loop), callbacks can directly call widget methods — no `call_from_thread()` needed.
- All accumulation and tool-call parsing logic stays identical
### `app/utils/display.py`
- All `print_*` functions become `render_*` functions that return Rich renderables:
- `render_user_message(content) -> Panel`
- `render_assistant_message(content) -> Panel`
- `render_tool_call(name, args) -> Text`
- `render_tool_result(name, output, is_error) -> Text`
- `render_iteration_header(iteration, max_iter) -> Text`
- `render_warning(message) -> Text`
- `render_error(message) -> Text`
- `print_banner()` removed — Header widget replaces it
- `print_token_usage()` becomes `render_token_usage() -> Text` for the StatusBar
- `print_history()` becomes `render_history() -> Table` — written to RichLog, may need width constraints for narrow terminals
- A `DisplayAdapter` class wraps a `RichLog` reference and provides `write_user_message()`, `write_tool_call()`, etc. methods that call `render_*` then `rich_log.write()`
### `app/agent/loop.py`
- `AgentLoop.__init__()` accepts a `DisplayAdapter` instead of calling `display.py` print functions directly
- All display calls route through the adapter: `self._display.write_tool_call(name, args)`, `self._display.write_iteration_header(i, max)`, etc.
- `_execute_tool_calls()` becomes `async def _execute_tool_calls()` to support async permission checks
- The loop logic (ReAct pattern, retry, truncation) is unchanged
### `app/services/permissions.py`
- `PermissionsService.check()` becomes `async def check()`
- Instead of `rich.prompt.Confirm.ask()` (blocking stdin read), it:
1. Creates an `asyncio.Event`
2. Posts a custom message to the app requesting a permission modal
3. The app pushes a modal screen with the permission question and approve/deny buttons
4. When the user responds, the modal sets the event and stores the result
5. `check()` awaits the event and reads the result
- Edge cases: dismiss without choosing = deny. Ctrl+C during modal = deny. Focus returns to Input after modal dismisses.
### `app/utils/logging.py`
- **Critical change:** The shared `console = Console()` instance will corrupt the Textual display since Textual takes exclusive terminal control
- When running under Textual: disable `RichHandler` (console handler), keep only the file handler
- Add a `setup_logging_for_tui()` function that reconfigures logging to file-only mode
- Called from `SneakyCodeApp.on_mount()` before any agent work begins
- The `console` object still exists but should not be used for output during TUI mode — all output goes through the DisplayAdapter
- Consider: `--log-file` becomes required (or auto-set to a default) when running in TUI mode, so logs are not lost
## Unchanged Files
- `app/services/llm.py` — HTTP client, SSE parsing untouched
- `app/agent/context.py` — session state untouched
- `app/models/*` — all data models untouched
- `app/tools/*` — all tool implementations untouched
- `app/utils/file_helpers.py` — path safety untouched
- `app/utils/token_counter.py` — token counting untouched
## Key Patterns
### Streaming in Textual
The agent loop runs as an async worker (on the event loop, NOT threaded). During streaming:
1. App shows `StreamingStatic` widget, writes "Thinking..." initially
2. Worker calls `StreamHandler.process_stream(chunks, on_content=..., on_thinking=..., on_done=...)`
3. `on_content` callback: updates `StreamingStatic` with `Panel(Markdown(partial_content), title="Assistant", border_style="green")` — throttled to ~100ms intervals
4. `on_done` callback: hides `StreamingStatic`, writes final content to `RichLog` via `DisplayAdapter`
Since the worker is async (not threaded), callbacks run on the event loop and can call widget methods directly.
### Permission Prompts
1. Agent loop (in async worker) calls `await permissions.check(operation, details)`
2. `check()` creates an `asyncio.Event` and posts `PermissionRequest` message to the app
3. App handles `PermissionRequest`: pushes a modal screen with the question, approve/deny buttons
4. Modal screen: on button press, stores result and sets the event
5. `check()` awaits the event, reads result, returns approved/denied
6. Focus management: Input loses focus when modal appears, regains focus when modal dismisses
7. Default on dismiss/Ctrl+C: deny
### Cancellation
- Ctrl+C (first press): cancels the running agent worker via `worker.cancel()`. The agent loop should check for cancellation between iterations.
- Ctrl+C (second press) or `/quit`: exits the app via `app.exit()`
## Dependencies
- Add `textual>=4.0.0` to pyproject.toml dependencies
## Verification
1. Run the app — header shows app name + model, no console corruption
2. Type a prompt — user panel appears in scroll area, input clears
3. During LLM streaming — assistant response types out live (throttled) in the scroll area
4. Thinking indicator shows during reasoning-only phases
5. Tool calls appear as compact lines in the scroll area
6. Footer shows token usage and iteration count, updating each step
7. Scroll area auto-scrolls to bottom on new content
8. /quit, /clear, /history commands work from the input
9. Permission prompts show as modal, approve/deny work, focus returns to input
10. Ctrl+C cancels running agent turn without quitting
11. Worker errors display as error panels in the scroll area
12. Logging goes to file only — no console corruption
13. Session resume works on startup via modal dialog

View File

@@ -1 +1,12 @@
Pressing up should cycle history like claude code.
# UI Issues
on /clear we need to reset the token counter in the header panel.
# Bugs
# Improvements
add -p to command line args so that the agent can run the prompt and return data directly via STDOUT
# Open questions:
How might we pass a directory to this app and have it use that directory as it's workspace so I don't have to copy files or do odd things to work in other directories.
How do we handle huge files not taking up so many tokens?

View File

@@ -21,10 +21,10 @@ from app.utils.display import (
class TestRenderFunctions:
def test_render_user_message_returns_panel(self) -> None:
def test_render_user_message_returns_text(self) -> None:
result = render_user_message("hello")
assert isinstance(result, Panel)
assert result.title == "You"
assert isinstance(result, Text)
assert "hello" in result.plain
def test_render_assistant_message_returns_panel(self) -> None:
result = render_assistant_message("response")
@@ -72,7 +72,7 @@ class TestDisplayAdapter:
adapter.write_user_message("hello")
mock_log.write.assert_called_once()
arg = mock_log.write.call_args[0][0]
assert isinstance(arg, Panel)
assert isinstance(arg, Text)
def test_write_tool_call(self) -> None:
mock_log = MagicMock()

View File

@@ -0,0 +1,314 @@
"""Tests for the file cache with LRU eviction and mtime invalidation."""
import os
import time
from pathlib import Path
import pytest
from app.models.config import AppConfig, load_config
from app.models.tool_call import ToolResultStatus
from app.tools.filesystem import ReadFileTool, ReadManyFilesTool
from app.utils.file_cache import CacheStats, FileCache, cached_read_file
from app.utils.file_helpers import BinaryFileError, FileSizeError, PathSecurityError
# ---------------------------------------------------------------------------
# FileCache unit tests
# ---------------------------------------------------------------------------
class TestFileCache:
def test_put_and_get_roundtrip(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "hello.txt"
f.write_text("hello world")
cache.put(f, "hello world")
assert cache.get(f) == "hello world"
def test_get_returns_none_for_missing_key(self, tmp_path: Path) -> None:
cache = FileCache()
assert cache.get(tmp_path / "nope.txt") is None
def test_mtime_change_causes_miss(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "data.txt"
f.write_text("v1")
cache.put(f, "v1")
# Mutate the file so mtime changes
time.sleep(0.05) # ensure mtime differs
f.write_text("v2")
assert cache.get(f) is None # stale → miss
assert cache.stats.invalidations == 1
def test_lru_eviction_at_capacity(self, tmp_path: Path) -> None:
cache = FileCache(max_entries=3)
files = []
for i in range(4):
f = tmp_path / f"f{i}.txt"
f.write_text(f"content-{i}")
files.append(f)
# Fill cache to capacity
for f in files[:3]:
cache.put(f, f.read_text())
assert len(cache) == 3
# Adding a 4th evicts the LRU (files[0])
cache.put(files[3], files[3].read_text())
assert len(cache) == 3
assert cache.get(files[0]) is None # evicted
assert cache.stats.evictions == 1
# files[1..3] still present
for f in files[1:]:
assert cache.get(f) is not None
def test_invalidate_removes_entry(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "rm.txt"
f.write_text("bye")
cache.put(f, "bye")
assert len(cache) == 1
cache.invalidate(f)
assert len(cache) == 0
assert cache.get(f) is None
assert cache.stats.invalidations == 1
def test_invalidate_noop_for_missing(self) -> None:
cache = FileCache()
cache.invalidate(Path("/nonexistent"))
assert cache.stats.invalidations == 0
def test_clear_empties_cache(self, tmp_path: Path) -> None:
cache = FileCache()
for i in range(5):
f = tmp_path / f"c{i}.txt"
f.write_text(str(i))
cache.put(f, str(i))
assert len(cache) == 5
cache.clear()
assert len(cache) == 0
def test_stats_accuracy(self, tmp_path: Path) -> None:
cache = FileCache(max_entries=2)
a = tmp_path / "a.txt"
b = tmp_path / "b.txt"
c = tmp_path / "c.txt"
a.write_text("a")
b.write_text("b")
c.write_text("c")
# Miss
cache.get(a)
assert cache.stats.misses == 1
assert cache.stats.hits == 0
# Put + hit
cache.put(a, "a")
cache.get(a)
assert cache.stats.hits == 1
# Fill + evict
cache.put(b, "b")
cache.put(c, "c") # evicts a
assert cache.stats.evictions == 1
def test_hit_rate(self) -> None:
stats = CacheStats(hits=3, misses=1)
assert stats.hit_rate == pytest.approx(0.75)
def test_hit_rate_zero_total(self) -> None:
stats = CacheStats()
assert stats.hit_rate == 0.0
def test_file_deleted_after_caching(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "gone.txt"
f.write_text("here")
cache.put(f, "here")
f.unlink()
assert cache.get(f) is None # stat fails → miss
def test_put_skips_when_stat_fails(self) -> None:
cache = FileCache()
cache.put(Path("/totally/nonexistent"), "data")
assert len(cache) == 0
def test_get_moves_to_end(self, tmp_path: Path) -> None:
"""Accessing an entry makes it most-recently-used, protecting from eviction."""
cache = FileCache(max_entries=3)
files = []
for i in range(3):
f = tmp_path / f"lru{i}.txt"
f.write_text(f"c{i}")
files.append(f)
cache.put(f, f"c{i}")
# Touch files[0] to make it MRU
cache.get(files[0])
# Add a new entry — files[1] (LRU) should be evicted, not files[0]
extra = tmp_path / "extra.txt"
extra.write_text("x")
cache.put(extra, "x")
assert cache.get(files[0]) is not None # protected by access
assert cache.get(files[1]) is None # evicted
# ---------------------------------------------------------------------------
# cached_read_file tests
# ---------------------------------------------------------------------------
class TestCachedReadFile:
def test_without_cache_matches_safe_read(self, tmp_path: Path) -> None:
f = tmp_path / "plain.txt"
f.write_text("hello")
content = cached_read_file(f, tmp_path, cache=None)
assert content == "hello"
def test_populates_on_miss_returns_on_hit(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "cached.txt"
f.write_text("data")
# First call: miss → read from disk → populate cache
content1 = cached_read_file(f, tmp_path, cache=cache)
assert content1 == "data"
assert cache.stats.misses == 1
assert cache.stats.hits == 0
# Second call: hit → from cache
content2 = cached_read_file(f, tmp_path, cache=cache)
assert content2 == "data"
assert cache.stats.hits == 1
def test_security_checks_run_on_cached_path(self, tmp_path: Path) -> None:
cache = FileCache()
with pytest.raises(PathSecurityError):
cached_read_file("/etc/passwd", tmp_path, cache=cache)
def test_binary_check_runs_on_cached_path(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "bin.dat"
f.write_bytes(b"\x00binary\x00")
with pytest.raises(BinaryFileError):
cached_read_file(f, tmp_path, cache=cache)
def test_size_check_runs_on_cached_path(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "big.txt"
f.write_text("x" * 200)
# First read populates cache
cached_read_file(f, tmp_path, max_size_bytes=1000, cache=cache)
# Now make file too big on disk — security check should catch it
# even though content is cached
f.write_text("x" * 2000)
with pytest.raises(FileSizeError):
cached_read_file(f, tmp_path, max_size_bytes=1000, cache=cache)
def test_file_not_found(self, tmp_path: Path) -> None:
cache = FileCache()
with pytest.raises(FileNotFoundError):
cached_read_file(tmp_path / "nope.txt", tmp_path, cache=cache)
# ---------------------------------------------------------------------------
# Tool-level cache-hit dedup tests
# ---------------------------------------------------------------------------
@pytest.fixture
def config() -> AppConfig:
return load_config()
@pytest.fixture
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
config.agent.workspace_root = tmp_path
return tmp_path, config
class TestReadFileToolCacheHit:
def test_first_read_returns_full_content(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
(ws / "hello.txt").write_text("hello world")
tool = ReadFileTool(ws, cfg, file_cache=cache)
result = tool.run("tc-1", {"file_path": "hello.txt"})
assert result.status == ToolResultStatus.SUCCESS
assert result.output == "hello world"
def test_second_read_returns_cached_message(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
(ws / "hello.txt").write_text("hello world")
tool = ReadFileTool(ws, cfg, file_cache=cache)
tool.run("tc-1", {"file_path": "hello.txt"})
result2 = tool.run("tc-2", {"file_path": "hello.txt"})
assert result2.status == ToolResultStatus.SUCCESS
assert "[Cached]" in result2.output
assert "hello.txt" in result2.output
assert "hello world" not in result2.output
def test_changed_file_returns_full_content_again(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
f = ws / "data.txt"
f.write_text("v1")
tool = ReadFileTool(ws, cfg, file_cache=cache)
tool.run("tc-1", {"file_path": "data.txt"})
# Mutate file so mtime changes
time.sleep(0.05)
f.write_text("v2")
result2 = tool.run("tc-2", {"file_path": "data.txt"})
assert result2.status == ToolResultStatus.SUCCESS
assert result2.output == "v2"
assert "[Cached]" not in result2.output
def test_no_cache_always_returns_content(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "hello.txt").write_text("hello")
tool = ReadFileTool(ws, cfg, file_cache=None)
r1 = tool.run("tc-1", {"file_path": "hello.txt"})
r2 = tool.run("tc-2", {"file_path": "hello.txt"})
assert r1.output == "hello"
assert r2.output == "hello"
class TestReadManyFilesToolCacheHit:
def test_cached_files_get_short_message(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
(ws / "a.txt").write_text("alpha")
(ws / "b.txt").write_text("bravo")
tool = ReadManyFilesTool(ws, cfg, file_cache=cache)
# First read — full content
r1 = tool.run("tc-1", {"file_paths": ["a.txt", "b.txt"]})
assert "alpha" in r1.output
assert "bravo" in r1.output
# Second read — cached messages
r2 = tool.run("tc-2", {"file_paths": ["a.txt", "b.txt"]})
assert "[Cached]" in r2.output
assert "alpha" not in r2.output
assert "bravo" not in r2.output

View File

@@ -0,0 +1,69 @@
"""Tests for the read_many_files tool."""
from pathlib import Path
import pytest
from app.models.config import AppConfig, load_config
from app.models.tool_call import ToolResultStatus
from app.tools.filesystem import ReadManyFilesTool
@pytest.fixture
def config() -> AppConfig:
return load_config()
@pytest.fixture
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
"""Create a temporary workspace for read_many_files tests."""
config.agent.workspace_root = tmp_path
return tmp_path, config
class TestReadManyFilesTool:
def test_read_multiple_files(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "a.txt").write_text("alpha")
(ws / "b.txt").write_text("bravo")
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-1", {"file_paths": ["a.txt", "b.txt"]})
assert result.status == ToolResultStatus.SUCCESS
assert "=== a.txt ===" in result.output
assert "alpha" in result.output
assert "=== b.txt ===" in result.output
assert "bravo" in result.output
def test_partial_failure(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "exists.txt").write_text("hello")
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-2", {"file_paths": ["exists.txt", "missing.txt"]})
assert result.status == ToolResultStatus.SUCCESS
assert "hello" in result.output
assert "[ERROR]" in result.output
assert "=== missing.txt ===" in result.output
def test_all_files_fail(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-3", {"file_paths": ["no1.txt", "no2.txt"]})
assert result.status == ToolResultStatus.ERROR
assert "All files failed" in (result.error or "")
def test_empty_file_paths(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-4", {"file_paths": []})
assert result.status == ToolResultStatus.ERROR
assert "empty" in (result.error or "").lower()
def test_path_security_inline_error(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "safe.txt").write_text("ok")
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-5", {"file_paths": ["safe.txt", "../../etc/passwd"]})
assert result.status == ToolResultStatus.SUCCESS
assert "ok" in result.output
assert "[ERROR]" in result.output
assert "outside" in result.output.lower()

View File

@@ -90,6 +90,6 @@ class TestRunCommandTool:
# Create a file in the workspace to verify cwd
(ws / "marker.txt").write_text("found")
tool = RunCommandTool(ws, cfg)
result = tool.run("tc-9", {"command": "cat marker.txt"})
result = tool.run("tc-9", {"command": "head marker.txt"})
assert result.status == ToolResultStatus.SUCCESS
assert "found" in result.output

View File

@@ -108,7 +108,7 @@ class TestToolRegistry:
registry = create_default_registry(workspace, config)
names = set(registry.get_all().keys())
assert names == {
"read_file", "list_dir", "grep_files", "find_files",
"read_file", "read_many_files", "list_dir", "grep_files", "find_files",
"write_file", "make_dir", "delete_file",
"str_replace", "patch_apply",
"run_command",
@@ -118,7 +118,7 @@ class TestToolRegistry:
def test_schema_export(self, workspace: Path, config: AppConfig) -> None:
registry = create_default_registry(workspace, config)
schemas = registry.get_openai_tools_schema()
assert len(schemas) == 11
assert len(schemas) == 12
assert all(s["type"] == "function" for s in schemas)