Compare commits
21 Commits
25fa7dc82b
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| 6145a23296 | |||
| 16d79df421 | |||
| 1ee721ac10 | |||
| d54a3480b8 | |||
| d3b286ba40 | |||
| d829e6553c | |||
| 2c532adbbc | |||
| be1ea81102 | |||
| 3fe0f7af47 | |||
| 05754fe06b | |||
| 0886727437 | |||
| 638aecb561 | |||
| b878408f3e | |||
| 5b5c3098bb | |||
| 4e3da84578 | |||
| 2ad3df521d | |||
| 4496fce354 | |||
| 133bcbda57 | |||
| 7705008b9c | |||
| 9273d14845 | |||
| f0d8ef8f0a |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -34,3 +34,6 @@ htmlcov/
|
||||
|
||||
# Worktrees
|
||||
.worktrees/
|
||||
|
||||
# SneakyCode local data
|
||||
.sneakycode/
|
||||
|
||||
90
README.md
90
README.md
@@ -26,14 +26,19 @@ pip install -e ".[dev]"
|
||||
|
||||
## Configuration
|
||||
|
||||
Edit `config/config.yaml` to configure the agent. Key settings:
|
||||
Edit `config/config.yaml` to configure the agent. The full configuration reference:
|
||||
|
||||
```yaml
|
||||
llm:
|
||||
model: "qwen3.5:latest" # Ollama model name
|
||||
endpoint: "http://localhost:11434" # Ollama endpoint
|
||||
api_path: "/v1/chat/completions" # API endpoint path
|
||||
temperature: 0.1 # Sampling temperature
|
||||
max_tokens: 4096 # Maximum tokens in LLM response
|
||||
timeout: 120 # Request timeout in seconds
|
||||
max_retries: 3 # Retry attempts on transient errors
|
||||
retry_backoff_base: 1.0 # Exponential backoff base (seconds)
|
||||
retry_backoff_max: 30.0 # Maximum backoff seconds
|
||||
|
||||
agent:
|
||||
max_iterations: 25 # Max tool-call iterations per turn
|
||||
@@ -43,6 +48,7 @@ agent:
|
||||
truncation_threshold: 0.85 # Budget fraction that triggers truncation
|
||||
|
||||
session:
|
||||
session_dir: ".sneakycode/sessions" # Directory for session files
|
||||
auto_save: true # Save session after each turn
|
||||
max_session_age_hours: 72 # Auto-cleanup old sessions
|
||||
offer_resume: true # Offer to resume on startup
|
||||
@@ -51,6 +57,48 @@ permissions:
|
||||
auto_approve: [read_file, list_dir, grep_files, find_files, finish]
|
||||
prompt_user: [write_file, delete_file, run_command, str_replace, patch_apply, make_dir]
|
||||
deny: []
|
||||
|
||||
tools:
|
||||
shell:
|
||||
allowed_commands: # Commands the LLM may run
|
||||
- git
|
||||
- python
|
||||
- pip
|
||||
- pytest
|
||||
- ruff
|
||||
- ls
|
||||
- cat
|
||||
- head
|
||||
- tail
|
||||
- wc
|
||||
- diff
|
||||
- grep
|
||||
- find
|
||||
- echo
|
||||
denied_commands: # Blocked commands
|
||||
- rm -rf /
|
||||
- sudo
|
||||
- curl
|
||||
- wget
|
||||
max_output_bytes: 65536 # Max captured output size (bytes)
|
||||
filesystem:
|
||||
max_file_size_bytes: 1048576 # 1 MB — max file size for read/write
|
||||
binary_detection: true # Detect and reject binary files
|
||||
|
||||
display:
|
||||
show_tool_calls: true # Show tool call details in output
|
||||
show_token_usage: true # Show token usage stats
|
||||
stream_output: true # Stream LLM output to terminal
|
||||
|
||||
skills:
|
||||
enabled: true # Enable the skills system
|
||||
directories: # Directories to scan for skill files
|
||||
- ".sneakycode/skills"
|
||||
|
||||
debug:
|
||||
enabled: false # Enable debug logging
|
||||
log_dir: ".sneakycode/logs" # Debug log directory
|
||||
max_files: 10 # Max debug log files to retain
|
||||
```
|
||||
|
||||
Environment variable `SNEAKYCODE_CONFIG` can override the config file path.
|
||||
@@ -58,9 +106,12 @@ Environment variable `SNEAKYCODE_CONFIG` can override the config file path.
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# Start the interactive REPL
|
||||
# Start the interactive TUI
|
||||
sneakycode
|
||||
|
||||
# Open a specific project directory
|
||||
sneakycode /path/to/project
|
||||
|
||||
# Or run directly
|
||||
python -m app.main
|
||||
|
||||
@@ -68,25 +119,38 @@ python -m app.main
|
||||
sneakycode --config path/to/config.yaml --verbose --log-file sneakycode.log
|
||||
```
|
||||
|
||||
### CLI Options
|
||||
|
||||
| Option | Description |
|
||||
|--------------------------|--------------------------------------------------|
|
||||
| `DIRECTORY` | Project directory to use as workspace root |
|
||||
| `--config PATH` | Path to config YAML file (default: `config/config.yaml`) |
|
||||
| `-v`, `--verbose` | Enable verbose (DEBUG) logging |
|
||||
| `--log-file PATH` | Path to log file for persistent logging |
|
||||
|
||||
### REPL Commands
|
||||
|
||||
| Command | Description |
|
||||
|------------|--------------------------------------|
|
||||
| `/quit` | Save session and exit |
|
||||
|-------------------|----------------------------------------------------|
|
||||
| `/help` | Show available commands |
|
||||
| `/quit` | Save session and exit (also `/exit`, `/bye`) |
|
||||
| `/history` | Show conversation history |
|
||||
| `/clear` | Clear conversation history |
|
||||
| `/save` | Manually save session |
|
||||
| `/session` | Show session info (messages, tokens) |
|
||||
| `/session` | Show session info (messages, tokens, start time) |
|
||||
| `/models` | List available Ollama models |
|
||||
| `/models <name>` | Switch to a different model |
|
||||
| `/skills` | List available skills |
|
||||
|
||||
### Session Persistence
|
||||
|
||||
Sessions are automatically saved after each agent turn and on exit. On startup, SneakyCode offers to resume the most recent session for the current workspace.
|
||||
|
||||
Session files are stored in `.sneakycode/sessions/` within the workspace root.
|
||||
Session files are stored in `.sneakycode/sessions/` within the workspace root (configurable via `session.session_dir`).
|
||||
|
||||
## Available Tools
|
||||
|
||||
SneakyCode provides 11 tools across 5 categories. See [docs/tools.md](docs/tools.md) for the full reference.
|
||||
SneakyCode provides tools across 6 categories. See [docs/tools.md](docs/tools.md) for the full reference.
|
||||
|
||||
| Category | Tools | Permission |
|
||||
|------------|-------------------------------------------------|---------------|
|
||||
@@ -96,6 +160,17 @@ SneakyCode provides 11 tools across 5 categories. See [docs/tools.md](docs/tools
|
||||
| Edit | `str_replace`, `patch_apply` | User confirm |
|
||||
| Shell | `run_command` | User confirm |
|
||||
| Control | `finish` | Auto-approved |
|
||||
| Skills | `load_skill` | Auto-approved |
|
||||
|
||||
The `load_skill` tool is available when `skills.enabled` is `true` in the config. It allows the LLM to load skill instructions from the configured skill directories.
|
||||
|
||||
## Skills
|
||||
|
||||
SneakyCode includes a skills system that lets you provide reusable instruction sets to the LLM. Skills are markdown files placed in `.sneakycode/skills/` (or any directory listed in `skills.directories`).
|
||||
|
||||
Skills are auto-discovered on startup. The LLM can load them via the `load_skill` tool, and you can list available skills with the `/skills` command.
|
||||
|
||||
To create a skill, add a `.md` file to your skills directory with a descriptive filename (e.g., `refactoring.md`). The file content is injected into the conversation when the skill is loaded.
|
||||
|
||||
## Development
|
||||
|
||||
@@ -121,6 +196,7 @@ app/
|
||||
├── models/ # Pydantic config and message schemas
|
||||
├── services/ # LLM client, streaming, permissions, session persistence
|
||||
├── tools/ # Tool implementations (one file per group)
|
||||
├── ui/ # Textual TUI application and widgets
|
||||
└── utils/ # Logging, display, file helpers, token counter
|
||||
config/
|
||||
└── config.yaml # Application configuration
|
||||
|
||||
@@ -7,7 +7,7 @@ import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from app.agent.context import SessionContext
|
||||
from app.models.config import AppConfig
|
||||
from app.models.config import AgentMode, AppConfig
|
||||
from app.models.message import Message
|
||||
from app.models.tool_call import ToolCall, ToolResult, ToolResultStatus
|
||||
from app.services.llm import LLMClient, LLMConnectionError, LLMError, LLMStreamError
|
||||
@@ -59,6 +59,12 @@ class AgentLoop:
|
||||
self._skills = skills_manager
|
||||
self._skill_runner = skill_runner
|
||||
self._tools_schema = registry.get_openai_tools_schema()
|
||||
if self._permissions.mode == AgentMode.PLAN:
|
||||
read_only = PermissionsService.READ_ONLY_TOOLS
|
||||
self._tools_schema = [
|
||||
t for t in self._tools_schema
|
||||
if t["function"]["name"] in read_only
|
||||
]
|
||||
self._system_prompt = self._build_system_prompt()
|
||||
self._cancelled = False
|
||||
|
||||
@@ -89,12 +95,47 @@ class AgentLoop:
|
||||
f"\n\nCurrently active skill: {self._skill_runner.active_skill_name}. "
|
||||
"When the skill's objective is complete, call the `finish_skill` tool."
|
||||
)
|
||||
if self._permissions.mode == AgentMode.PLAN:
|
||||
prompt += (
|
||||
"\n\nYou are in PLAN mode. You may only use read-only tools: "
|
||||
"read_file, list_dir, grep_files, find_files, finish. "
|
||||
"Do NOT attempt to write files, edit code, or run commands. "
|
||||
"Instead, describe what changes you would make, which files "
|
||||
"you would modify, and provide the reasoning for each change."
|
||||
)
|
||||
return prompt
|
||||
|
||||
# Models whose chat templates understand /no_think directives.
|
||||
_THINKING_MODEL_PREFIXES = ("qwen", "qwq")
|
||||
|
||||
def _model_supports_no_think(self) -> bool:
|
||||
"""Check if the current model uses a thinking chat template."""
|
||||
model_lower = self._config.llm.model.lower()
|
||||
return any(model_lower.startswith(p) for p in self._THINKING_MODEL_PREFIXES)
|
||||
|
||||
def _get_messages_with_system_prompt(self) -> list[Message]:
|
||||
"""Prepend the system prompt to conversation history."""
|
||||
"""Prepend the system prompt to conversation history.
|
||||
|
||||
When thinking is disabled on a model that supports it, appends a
|
||||
system-level /no_think directive after the last user message so
|
||||
Qwen 3.x (and similar) chat templates see it.
|
||||
"""
|
||||
system_msg = Message(role="system", content=self._system_prompt)
|
||||
return [system_msg] + self._ctx.get_history()
|
||||
history = self._ctx.get_history()
|
||||
|
||||
if not self._config.llm.thinking and self._model_supports_no_think() and history:
|
||||
history = list(history)
|
||||
# Find last user message and insert a system hint after it
|
||||
for i in range(len(history) - 1, -1, -1):
|
||||
if history[i].role == "user":
|
||||
no_think_msg = Message(
|
||||
role="system",
|
||||
content="/no_think",
|
||||
)
|
||||
history.insert(i + 1, no_think_msg)
|
||||
break
|
||||
|
||||
return [system_msg] + history
|
||||
|
||||
async def run_turn(self, user_input: str) -> None:
|
||||
"""Execute one full agent turn: add user message, loop until done.
|
||||
@@ -107,6 +148,7 @@ class AgentLoop:
|
||||
|
||||
max_iter = self._config.agent.max_iterations
|
||||
reasoning_only_streak = 0
|
||||
empty_streak = 0
|
||||
for iteration in range(1, max_iter + 1):
|
||||
if self._cancelled:
|
||||
if self._display:
|
||||
@@ -161,6 +203,10 @@ class AgentLoop:
|
||||
reasoning_only_streak += 1
|
||||
self._ctx.pop_last_message()
|
||||
|
||||
# When thinking is disabled, reasoning-only is expected model noise.
|
||||
# Nudge immediately and silently to avoid wasting iterations.
|
||||
thinking_disabled = not self._config.llm.thinking
|
||||
|
||||
# If the last context messages are tool errors, nudge immediately
|
||||
# rather than wasting retries — the model is likely confused by the error.
|
||||
has_recent_tool_error = any(
|
||||
@@ -168,9 +214,14 @@ class AgentLoop:
|
||||
for m in self._ctx.get_history()[-3:]
|
||||
)
|
||||
|
||||
if has_recent_tool_error or reasoning_only_streak >= _MAX_REASONING_RETRIES:
|
||||
# Nudge the model by injecting a user hint
|
||||
if self._display:
|
||||
should_nudge = (
|
||||
thinking_disabled
|
||||
or has_recent_tool_error
|
||||
or reasoning_only_streak >= _MAX_REASONING_RETRIES
|
||||
)
|
||||
|
||||
if should_nudge:
|
||||
if not thinking_disabled and self._display:
|
||||
self._display.write_warning(
|
||||
f"Model produced reasoning but no response {reasoning_only_streak} times. "
|
||||
"Nudging model to respond..."
|
||||
@@ -188,10 +239,42 @@ class AgentLoop:
|
||||
# Successful response — reset streak
|
||||
reasoning_only_streak = 0
|
||||
|
||||
# No tool calls → task complete (plain text response)
|
||||
if not assistant_msg.tool_calls:
|
||||
# Detect completely empty response (no content, no tool calls)
|
||||
if not assistant_msg.content and not assistant_msg.tool_calls:
|
||||
empty_streak += 1
|
||||
self._ctx.pop_last_message() # Don't keep empty messages
|
||||
if empty_streak >= 2:
|
||||
if self._display:
|
||||
self._display.write_warning(
|
||||
"Model returned repeated empty responses — "
|
||||
"try a different model or check Ollama logs."
|
||||
)
|
||||
break
|
||||
if self._display:
|
||||
self._display.write_warning("Model returned empty response. Retrying without tools...")
|
||||
# Retry without tool schemas — some models return empty when
|
||||
# tools are in the payload but the model can't handle them.
|
||||
assistant_msg = await self._llm_step(skip_tools=True)
|
||||
if assistant_msg is None:
|
||||
break
|
||||
if assistant_msg.content:
|
||||
self._ctx.add_message("assistant", assistant_msg.content)
|
||||
if self._display:
|
||||
self._display.write_assistant_message(assistant_msg.content)
|
||||
self._handler.reset()
|
||||
break
|
||||
# Still empty even without tools
|
||||
self._handler.reset()
|
||||
continue
|
||||
|
||||
empty_streak = 0 # reset on successful non-empty response
|
||||
|
||||
# Display any assistant text content (even if tool calls follow)
|
||||
if self._display and assistant_msg.content:
|
||||
self._display.write_assistant_message(assistant_msg.content)
|
||||
|
||||
# No tool calls → task complete (plain text response)
|
||||
if not assistant_msg.tool_calls:
|
||||
break
|
||||
|
||||
# Execute tool calls
|
||||
@@ -219,21 +302,25 @@ class AgentLoop:
|
||||
if self._display:
|
||||
self._display.write_warning(f"Agent reached maximum iterations ({max_iter}). Stopping.")
|
||||
|
||||
async def _llm_step(self) -> Message | None:
|
||||
async def _llm_step(self, *, skip_tools: bool = False) -> Message | None:
|
||||
"""Stream one LLM response and return the accumulated Message.
|
||||
|
||||
Uses retry-enabled streaming. On mid-stream errors, attempts to recover
|
||||
partial content if available.
|
||||
|
||||
Args:
|
||||
skip_tools: If True, send the request without tool schemas (fallback mode).
|
||||
|
||||
Returns:
|
||||
The assistant Message, or None if an error occurred.
|
||||
"""
|
||||
messages = self._get_messages_with_system_prompt()
|
||||
if self._debug:
|
||||
self._debug.log_request(messages, self._config.llm.model)
|
||||
tools = None if skip_tools else self._tools_schema
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
chunk_iter = self._client.stream_chat_with_retry(messages, tools=self._tools_schema)
|
||||
chunk_iter = self._client.stream_chat_with_retry(messages, tools=tools)
|
||||
result = await self._handler.process_stream(chunk_iter)
|
||||
if result and self._debug:
|
||||
elapsed = (time.monotonic() - t0) * 1000
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Pydantic configuration models mapping to config/config.yaml."""
|
||||
|
||||
import os
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -8,6 +9,31 @@ import yaml
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
|
||||
class AgentMode(StrEnum):
|
||||
"""Runtime agent mode controlling permission behavior."""
|
||||
|
||||
NORMAL = "normal"
|
||||
PLAN = "plan"
|
||||
AUTO = "auto"
|
||||
|
||||
|
||||
class ModelProfile(BaseModel):
|
||||
"""Per-model overrides applied when switching models."""
|
||||
|
||||
max_conversation_tokens: int | None = Field(
|
||||
default=None, description="Token budget override for this model's context window"
|
||||
)
|
||||
thinking: bool | None = Field(
|
||||
default=None, description="Override thinking mode for this model"
|
||||
)
|
||||
temperature: float | None = Field(
|
||||
default=None, description="Override sampling temperature"
|
||||
)
|
||||
max_tokens: int | None = Field(
|
||||
default=None, description="Override max response tokens"
|
||||
)
|
||||
|
||||
|
||||
class LLMConfig(BaseModel):
|
||||
"""LLM backend configuration."""
|
||||
|
||||
@@ -20,6 +46,10 @@ class LLMConfig(BaseModel):
|
||||
max_retries: int = Field(default=3, description="Max retry attempts on transient errors")
|
||||
retry_backoff_base: float = Field(default=1.0, description="Base seconds for exponential backoff")
|
||||
retry_backoff_max: float = Field(default=30.0, description="Maximum backoff seconds")
|
||||
thinking: bool = Field(
|
||||
default=True,
|
||||
description="Enable model thinking/reasoning mode (disable to reduce reasoning-only loops)",
|
||||
)
|
||||
extra_body: dict[str, Any] = Field(
|
||||
default_factory=dict,
|
||||
description="Extra parameters merged into the API request body (model-specific)",
|
||||
@@ -60,11 +90,19 @@ class ShellToolConfig(BaseModel):
|
||||
max_output_bytes: int = Field(default=65536, description="Max output capture size in bytes")
|
||||
|
||||
|
||||
class FileCacheConfig(BaseModel):
|
||||
"""File cache configuration."""
|
||||
|
||||
enabled: bool = Field(default=True, description="Enable file content caching")
|
||||
max_entries: int = Field(default=128, description="Maximum cached file entries (LRU eviction)")
|
||||
|
||||
|
||||
class FilesystemToolConfig(BaseModel):
|
||||
"""Filesystem tool limits."""
|
||||
|
||||
max_file_size_bytes: int = Field(default=1_048_576, description="Max file size for read/write")
|
||||
binary_detection: bool = Field(default=True, description="Detect and reject binary files")
|
||||
cache: FileCacheConfig = Field(default_factory=FileCacheConfig, description="File cache settings")
|
||||
|
||||
|
||||
class ToolsConfig(BaseModel):
|
||||
@@ -124,6 +162,10 @@ class AppConfig(BaseModel):
|
||||
session: SessionConfig = Field(default_factory=SessionConfig)
|
||||
debug: DebugConfig = Field(default_factory=DebugConfig)
|
||||
skills: SkillsConfig = Field(default_factory=SkillsConfig)
|
||||
model_profiles: dict[str, ModelProfile] = Field(
|
||||
default_factory=dict,
|
||||
description="Per-model overrides keyed by model name prefix",
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def resolve_workspace_root(self) -> "AppConfig":
|
||||
@@ -131,6 +173,39 @@ class AppConfig(BaseModel):
|
||||
self.agent.workspace_root = self.agent.workspace_root.resolve()
|
||||
return self
|
||||
|
||||
def get_model_profile(self, model: str) -> ModelProfile | None:
|
||||
"""Find the best matching model profile by prefix.
|
||||
|
||||
Matches the longest prefix first (e.g., "llama3.1" beats "llama3"
|
||||
for model "llama3.1:latest"). Returns None if no profile matches.
|
||||
"""
|
||||
model_lower = model.lower().split(":")[0] # strip tag
|
||||
best_match: str | None = None
|
||||
for key in self.model_profiles:
|
||||
key_lower = key.lower()
|
||||
if model_lower == key_lower or model_lower.startswith(key_lower):
|
||||
if best_match is None or len(key) > len(best_match):
|
||||
best_match = key
|
||||
return self.model_profiles.get(best_match) if best_match else None
|
||||
|
||||
def apply_model_profile(self, model: str) -> ModelProfile | None:
|
||||
"""Apply the matching model profile overrides to the active config.
|
||||
|
||||
Returns the applied profile, or None if no profile matched.
|
||||
"""
|
||||
profile = self.get_model_profile(model)
|
||||
if profile is None:
|
||||
return None
|
||||
if profile.max_conversation_tokens is not None:
|
||||
self.agent.max_conversation_tokens = profile.max_conversation_tokens
|
||||
if profile.thinking is not None:
|
||||
self.llm.thinking = profile.thinking
|
||||
if profile.temperature is not None:
|
||||
self.llm.temperature = profile.temperature
|
||||
if profile.max_tokens is not None:
|
||||
self.llm.max_tokens = profile.max_tokens
|
||||
return profile
|
||||
|
||||
|
||||
# Default config file location relative to project root
|
||||
_DEFAULT_CONFIG_PATH = Path("config/config.yaml")
|
||||
|
||||
@@ -151,7 +151,12 @@ class LLMClient:
|
||||
if tools:
|
||||
payload["tools"] = tools
|
||||
|
||||
# Merge model-specific extra parameters (e.g., enable_thinking, reasoning_effort)
|
||||
# When thinking is disabled, inject chat_template_kwargs for backends
|
||||
# that support it (Qwen 3.x thinking models).
|
||||
if not self._config.thinking and self._config.model.lower().startswith(("qwen", "qwq")):
|
||||
payload.setdefault("chat_template_kwargs", {})["enable_thinking"] = False
|
||||
|
||||
# Merge model-specific extra parameters (e.g., reasoning_effort)
|
||||
if self._config.extra_body:
|
||||
payload.update(self._config.extra_body)
|
||||
|
||||
@@ -166,20 +171,32 @@ class LLMClient:
|
||||
status_code=response.status_code,
|
||||
)
|
||||
|
||||
chunk_count = 0
|
||||
async for line in response.aiter_lines():
|
||||
if not line.startswith("data: "):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
data = line[6:] # strip "data: " prefix
|
||||
|
||||
# SSE format: "data: {json}" or "data: [DONE]"
|
||||
if line.startswith("data: "):
|
||||
data = line[6:]
|
||||
if data.strip() == "[DONE]":
|
||||
return
|
||||
break
|
||||
elif line.startswith("{"):
|
||||
# Plain NDJSON fallback (some Ollama versions)
|
||||
data = line
|
||||
else:
|
||||
continue
|
||||
|
||||
try:
|
||||
yield json.loads(data)
|
||||
chunk_count += 1
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("malformed_sse_chunk", data=data[:200])
|
||||
|
||||
if chunk_count == 0:
|
||||
logger.warning("empty_stream", model=self._config.model)
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
raise LLMConnectionError(f"Cannot connect to LLM endpoint: {e}") from e
|
||||
except httpx.TimeoutException as e:
|
||||
|
||||
@@ -4,16 +4,20 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import shlex
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
from app.models.config import PermissionsConfig, ToolsConfig
|
||||
from app.models.config import AgentMode, PermissionsConfig, ToolsConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Type alias for the async prompt callback
|
||||
PromptCallback = Callable[[str, str], Awaitable[bool]]
|
||||
|
||||
# Detect shell redirects that write to files (>, >>, heredocs)
|
||||
_WRITE_REDIRECT_PATTERN = re.compile(r"(?:>\s*\S|>>|<<)")
|
||||
|
||||
|
||||
class PermissionDenied(Exception):
|
||||
"""Raised when a tool is denied execution by permissions policy."""
|
||||
@@ -26,6 +30,10 @@ class PermissionsService:
|
||||
shows a modal dialog. Without a callback, unlisted tools are denied.
|
||||
"""
|
||||
|
||||
READ_ONLY_TOOLS: frozenset[str] = frozenset({
|
||||
"read_file", "list_dir", "grep_files", "find_files", "finish",
|
||||
})
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: PermissionsConfig,
|
||||
@@ -34,6 +42,16 @@ class PermissionsService:
|
||||
self.config = config
|
||||
self._tools_config = tools_config
|
||||
self._prompt_callback: PromptCallback | None = None
|
||||
self._mode: AgentMode = AgentMode.NORMAL
|
||||
|
||||
@property
|
||||
def mode(self) -> AgentMode:
|
||||
"""Current agent mode."""
|
||||
return self._mode
|
||||
|
||||
@mode.setter
|
||||
def mode(self, value: AgentMode) -> None:
|
||||
self._mode = value
|
||||
|
||||
def set_prompt_callback(self, callback: PromptCallback) -> None:
|
||||
"""Set the async callback used to prompt the user for permission.
|
||||
@@ -63,6 +81,16 @@ class PermissionsService:
|
||||
logger.info("Tool '%s' is in deny list — blocked", tool_name)
|
||||
return False
|
||||
|
||||
if self._mode == AgentMode.AUTO:
|
||||
logger.debug("Tool '%s' auto-approved (AUTO mode)", tool_name)
|
||||
return True
|
||||
|
||||
if self._mode == AgentMode.PLAN:
|
||||
if tool_name not in self.READ_ONLY_TOOLS:
|
||||
logger.info("Tool '%s' blocked in Plan mode (read-only tools only)", tool_name)
|
||||
return False
|
||||
return True
|
||||
|
||||
if tool_name in self.config.auto_approve:
|
||||
logger.debug("Tool '%s' is auto-approved", tool_name)
|
||||
return True
|
||||
@@ -104,6 +132,11 @@ class PermissionsService:
|
||||
logger.info("Shell command '%s' matches denied prefix '%s'", cmd, denied)
|
||||
return False
|
||||
|
||||
# Detect shell redirects that write to files — require approval
|
||||
if _WRITE_REDIRECT_PATTERN.search(cmd):
|
||||
logger.info("Shell command '%s' contains file-write redirect — requiring approval", cmd)
|
||||
return None # fall through to user prompt
|
||||
|
||||
# Allowed commands: base executable match
|
||||
if shell_config.allowed_commands:
|
||||
if base_cmd in shell_config.allowed_commands:
|
||||
|
||||
@@ -52,6 +52,10 @@ class SessionManager:
|
||||
self._session_dir = workspace_root / config.session_dir
|
||||
self._session_id = f"{self._workspace_hash}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
def update_model(self, model: str) -> None:
|
||||
"""Update the model name for session metadata."""
|
||||
self._model = model
|
||||
|
||||
def save(self, ctx: "SessionContext") -> Path:
|
||||
"""Save session state to a JSON file via atomic write.
|
||||
|
||||
|
||||
@@ -60,8 +60,10 @@ class StreamHandler:
|
||||
"""
|
||||
thinking_notified = False
|
||||
last_update_time = 0.0
|
||||
chunk_count = 0
|
||||
|
||||
async for chunk in chunk_iter:
|
||||
chunk_count += 1
|
||||
self._process_chunk(chunk)
|
||||
|
||||
if not self._display_config.stream_output:
|
||||
@@ -96,6 +98,14 @@ class StreamHandler:
|
||||
self._on_done()
|
||||
|
||||
tool_calls = self._build_tool_calls() or None
|
||||
|
||||
if chunk_count > 0 and not self._accumulated_content and not tool_calls:
|
||||
logger.debug(
|
||||
"stream_empty_result",
|
||||
chunks_received=chunk_count,
|
||||
had_reasoning=bool(self._accumulated_reasoning),
|
||||
)
|
||||
|
||||
return Message(
|
||||
role="assistant",
|
||||
content=self._accumulated_content or None,
|
||||
@@ -183,11 +193,8 @@ class StreamHandler:
|
||||
return bool(self._accumulated_reasoning) and not self._accumulated_content and not self._tool_calls
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Clear all accumulators for the next turn."""
|
||||
"""Clear accumulators for the next LLM call, preserving UI callbacks."""
|
||||
self._accumulated_content = ""
|
||||
self._accumulated_reasoning = ""
|
||||
self._tool_calls.clear()
|
||||
self._usage = None
|
||||
self._on_content = None
|
||||
self._on_thinking = None
|
||||
self._on_done = None
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
"""Edit tools: str_replace and patch_apply."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.models.config import AppConfig
|
||||
from app.models.tool_call import ToolResult, ToolResultStatus
|
||||
from app.tools.base import BaseTool
|
||||
from app.utils.file_cache import FileCache, cached_read_file
|
||||
from app.utils.file_helpers import (
|
||||
FileSizeError,
|
||||
PathSecurityError,
|
||||
@@ -37,6 +41,12 @@ class StrReplaceTool(BaseTool):
|
||||
)
|
||||
params_model = StrReplaceParams
|
||||
|
||||
def __init__(
|
||||
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
|
||||
) -> None:
|
||||
super().__init__(workspace_root, config)
|
||||
self._file_cache = file_cache
|
||||
|
||||
def execute(
|
||||
self, *, tool_call_id: str, file_path: str, old_str: str, new_str: str, **kwargs: Any
|
||||
) -> ToolResult:
|
||||
@@ -44,11 +54,12 @@ class StrReplaceTool(BaseTool):
|
||||
|
||||
# Read the file
|
||||
try:
|
||||
content = safe_read_file(
|
||||
content = cached_read_file(
|
||||
file_path,
|
||||
self.workspace_root,
|
||||
max_size_bytes=fs_config.max_file_size_bytes,
|
||||
check_binary=fs_config.binary_detection,
|
||||
cache=self._file_cache,
|
||||
)
|
||||
except PathSecurityError as exc:
|
||||
return ToolResult(
|
||||
@@ -117,8 +128,14 @@ class StrReplaceTool(BaseTool):
|
||||
safe_path = resolve_safe_path(file_path, self.workspace_root)
|
||||
rel_path = safe_path.relative_to(self.workspace_root)
|
||||
except (PathSecurityError, ValueError):
|
||||
safe_path = None
|
||||
rel_path = Path(file_path)
|
||||
|
||||
# Pre-warm cache with the new content (we already have it in memory).
|
||||
if self._file_cache is not None and safe_path is not None:
|
||||
self._file_cache.invalidate(safe_path)
|
||||
self._file_cache.put(safe_path, new_content)
|
||||
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
@@ -144,6 +161,12 @@ class PatchApplyTool(BaseTool):
|
||||
)
|
||||
params_model = PatchApplyParams
|
||||
|
||||
def __init__(
|
||||
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
|
||||
) -> None:
|
||||
super().__init__(workspace_root, config)
|
||||
self._file_cache = file_cache
|
||||
|
||||
def execute(self, *, tool_call_id: str, file_path: str, patch: str, **kwargs: Any) -> ToolResult:
|
||||
try:
|
||||
safe_path = resolve_safe_path(file_path, self.workspace_root)
|
||||
@@ -195,6 +218,9 @@ class PatchApplyTool(BaseTool):
|
||||
error=f"Patch failed (exit {result.returncode}): {result.stderr or result.stdout}",
|
||||
)
|
||||
|
||||
if self._file_cache is not None:
|
||||
self._file_cache.invalidate(safe_path)
|
||||
|
||||
try:
|
||||
rel_path = safe_path.relative_to(self.workspace_root)
|
||||
except ValueError:
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
"""Filesystem tools: read_file, list_dir, write_file, make_dir, delete_file."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.models.config import AppConfig
|
||||
from app.models.tool_call import ToolResult, ToolResultStatus
|
||||
from app.tools.base import BaseTool
|
||||
from app.utils.file_cache import FileCache, cached_read_file
|
||||
from app.utils.file_helpers import (
|
||||
BinaryFileError,
|
||||
FileSizeError,
|
||||
@@ -23,6 +27,12 @@ class ReadFileParams(BaseModel):
|
||||
file_path: str = Field(description="Path to the file to read (relative to workspace root)")
|
||||
|
||||
|
||||
class ReadManyFilesParams(BaseModel):
|
||||
"""Parameters for the read_many_files tool."""
|
||||
|
||||
file_paths: list[str] = Field(description="List of file paths to read (relative to workspace root)")
|
||||
|
||||
|
||||
class ReadFileTool(BaseTool):
|
||||
"""Read the contents of a file within the workspace."""
|
||||
|
||||
@@ -30,14 +40,22 @@ class ReadFileTool(BaseTool):
|
||||
description = "Read the full contents of a text file. Returns the file content as a string."
|
||||
params_model = ReadFileParams
|
||||
|
||||
def __init__(
|
||||
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
|
||||
) -> None:
|
||||
super().__init__(workspace_root, config)
|
||||
self._file_cache = file_cache
|
||||
|
||||
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
|
||||
fs_config = self.config.tools.filesystem
|
||||
hits_before = self._file_cache.stats.hits if self._file_cache else 0
|
||||
try:
|
||||
content = safe_read_file(
|
||||
content = cached_read_file(
|
||||
file_path,
|
||||
self.workspace_root,
|
||||
max_size_bytes=fs_config.max_file_size_bytes,
|
||||
check_binary=fs_config.binary_detection,
|
||||
cache=self._file_cache,
|
||||
)
|
||||
except PathSecurityError as exc:
|
||||
return ToolResult(
|
||||
@@ -47,11 +65,12 @@ class ReadFileTool(BaseTool):
|
||||
error=str(exc),
|
||||
)
|
||||
except FileNotFoundError as exc:
|
||||
filename = Path(file_path).name
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
status=ToolResultStatus.ERROR,
|
||||
error=str(exc),
|
||||
error=f"{exc}. Use find_files to locate it, e.g. find_files(pattern=\"{filename}\")",
|
||||
)
|
||||
except FileSizeError as exc:
|
||||
return ToolResult(
|
||||
@@ -68,6 +87,23 @@ class ReadFileTool(BaseTool):
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
# On cache hit the file is unchanged — its content is already in
|
||||
# conversation context from the earlier read, so avoid resending it.
|
||||
was_cache_hit = (
|
||||
self._file_cache is not None
|
||||
and self._file_cache.stats.hits > hits_before
|
||||
)
|
||||
if was_cache_hit:
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
status=ToolResultStatus.SUCCESS,
|
||||
output=(
|
||||
f"[Cached] {file_path} is unchanged since last read "
|
||||
f"({len(content):,} chars). Content is already in conversation context."
|
||||
),
|
||||
)
|
||||
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
@@ -76,6 +112,76 @@ class ReadFileTool(BaseTool):
|
||||
)
|
||||
|
||||
|
||||
class ReadManyFilesTool(BaseTool):
|
||||
"""Read contents of multiple files at once."""
|
||||
|
||||
name = "read_many_files"
|
||||
description = (
|
||||
"Read contents of multiple files at once. Returns each file's content "
|
||||
"prefixed with its path header."
|
||||
)
|
||||
params_model = ReadManyFilesParams
|
||||
|
||||
def __init__(
|
||||
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
|
||||
) -> None:
|
||||
super().__init__(workspace_root, config)
|
||||
self._file_cache = file_cache
|
||||
|
||||
def execute(self, *, tool_call_id: str, file_paths: list[str], **kwargs: Any) -> ToolResult:
|
||||
if not file_paths:
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
status=ToolResultStatus.ERROR,
|
||||
error="file_paths list is empty",
|
||||
)
|
||||
|
||||
fs_config = self.config.tools.filesystem
|
||||
sections: list[str] = []
|
||||
success_count = 0
|
||||
|
||||
for fp in file_paths:
|
||||
hits_before = self._file_cache.stats.hits if self._file_cache else 0
|
||||
try:
|
||||
content = cached_read_file(
|
||||
fp,
|
||||
self.workspace_root,
|
||||
max_size_bytes=fs_config.max_file_size_bytes,
|
||||
check_binary=fs_config.binary_detection,
|
||||
cache=self._file_cache,
|
||||
)
|
||||
was_hit = (
|
||||
self._file_cache is not None
|
||||
and self._file_cache.stats.hits > hits_before
|
||||
)
|
||||
if was_hit:
|
||||
sections.append(
|
||||
f"=== {fp} ===\n[Cached] Unchanged since last read "
|
||||
f"({len(content):,} chars). Already in conversation context."
|
||||
)
|
||||
else:
|
||||
sections.append(f"=== {fp} ===\n{content}")
|
||||
success_count += 1
|
||||
except (PathSecurityError, FileNotFoundError, FileSizeError, BinaryFileError) as exc:
|
||||
sections.append(f"=== {fp} ===\n[ERROR] {exc}")
|
||||
|
||||
if success_count == 0:
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
status=ToolResultStatus.ERROR,
|
||||
error="All files failed to read:\n" + "\n".join(sections),
|
||||
)
|
||||
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
status=ToolResultStatus.SUCCESS,
|
||||
output="\n".join(sections),
|
||||
)
|
||||
|
||||
|
||||
class ListDirParams(BaseModel):
|
||||
"""Parameters for the list_dir tool."""
|
||||
|
||||
@@ -167,6 +273,12 @@ class WriteFileTool(BaseTool):
|
||||
)
|
||||
params_model = WriteFileParams
|
||||
|
||||
def __init__(
|
||||
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
|
||||
) -> None:
|
||||
super().__init__(workspace_root, config)
|
||||
self._file_cache = file_cache
|
||||
|
||||
def execute(self, *, tool_call_id: str, file_path: str, content: str, **kwargs: Any) -> ToolResult:
|
||||
fs_config = self.config.tools.filesystem
|
||||
try:
|
||||
@@ -191,6 +303,9 @@ class WriteFileTool(BaseTool):
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
if self._file_cache is not None:
|
||||
self._file_cache.invalidate(safe_path)
|
||||
|
||||
try:
|
||||
rel_path = safe_path.relative_to(self.workspace_root)
|
||||
except ValueError:
|
||||
@@ -272,6 +387,12 @@ class DeleteFileTool(BaseTool):
|
||||
description = "Delete a single file. Does not delete directories."
|
||||
params_model = DeleteFileParams
|
||||
|
||||
def __init__(
|
||||
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
|
||||
) -> None:
|
||||
super().__init__(workspace_root, config)
|
||||
self._file_cache = file_cache
|
||||
|
||||
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
|
||||
try:
|
||||
safe_path = resolve_safe_path(file_path, self.workspace_root)
|
||||
@@ -309,6 +430,9 @@ class DeleteFileTool(BaseTool):
|
||||
error=f"Failed to delete file: {exc}",
|
||||
)
|
||||
|
||||
if self._file_cache is not None:
|
||||
self._file_cache.invalidate(safe_path)
|
||||
|
||||
try:
|
||||
rel_path = safe_path.relative_to(self.workspace_root)
|
||||
except ValueError:
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any
|
||||
|
||||
from app.models.config import AppConfig
|
||||
from app.tools.base import BaseTool
|
||||
from app.utils.file_cache import FileCache
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.services.skills import SkillsManager
|
||||
@@ -89,6 +90,7 @@ def create_default_registry(
|
||||
config: AppConfig,
|
||||
skills_manager: SkillsManager | None = None,
|
||||
skill_runner: object | None = None,
|
||||
file_cache: FileCache | None = None,
|
||||
) -> ToolRegistry:
|
||||
"""Create a ToolRegistry populated with all built-in tools.
|
||||
|
||||
@@ -97,9 +99,10 @@ def create_default_registry(
|
||||
config: Application configuration.
|
||||
skills_manager: Optional skills manager for skill tools.
|
||||
skill_runner: Optional SkillRunner for package skill activation.
|
||||
file_cache: Optional file cache shared across file-reading tools.
|
||||
"""
|
||||
# Read tools
|
||||
from app.tools.filesystem import ListDirTool, ReadFileTool
|
||||
from app.tools.filesystem import ListDirTool, ReadFileTool, ReadManyFilesTool
|
||||
|
||||
# Write tools
|
||||
from app.tools.filesystem import DeleteFileTool, MakeDirTool, WriteFileTool
|
||||
@@ -119,7 +122,8 @@ def create_default_registry(
|
||||
registry = ToolRegistry()
|
||||
|
||||
# Read
|
||||
registry.register(ReadFileTool(workspace_root, config))
|
||||
registry.register(ReadFileTool(workspace_root, config, file_cache=file_cache))
|
||||
registry.register(ReadManyFilesTool(workspace_root, config, file_cache=file_cache))
|
||||
registry.register(ListDirTool(workspace_root, config))
|
||||
|
||||
# Search
|
||||
@@ -127,13 +131,13 @@ def create_default_registry(
|
||||
registry.register(FindFilesTool(workspace_root, config))
|
||||
|
||||
# Write
|
||||
registry.register(WriteFileTool(workspace_root, config))
|
||||
registry.register(WriteFileTool(workspace_root, config, file_cache=file_cache))
|
||||
registry.register(MakeDirTool(workspace_root, config))
|
||||
registry.register(DeleteFileTool(workspace_root, config))
|
||||
registry.register(DeleteFileTool(workspace_root, config, file_cache=file_cache))
|
||||
|
||||
# Edit
|
||||
registry.register(StrReplaceTool(workspace_root, config))
|
||||
registry.register(PatchApplyTool(workspace_root, config))
|
||||
registry.register(StrReplaceTool(workspace_root, config, file_cache=file_cache))
|
||||
registry.register(PatchApplyTool(workspace_root, config, file_cache=file_cache))
|
||||
|
||||
# Shell
|
||||
registry.register(RunCommandTool(workspace_root, config))
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Shell tool: run_command."""
|
||||
|
||||
import re
|
||||
import shlex
|
||||
import subprocess
|
||||
from typing import Any
|
||||
@@ -11,6 +12,9 @@ from app.tools.base import BaseTool
|
||||
|
||||
_DEFAULT_TIMEOUT = 30
|
||||
|
||||
# Detect shell redirects that write to files (>, >>, heredocs)
|
||||
_WRITE_REDIRECT_PATTERN = re.compile(r"(?:>\s*\S|>>|<<)")
|
||||
|
||||
|
||||
class RunCommandParams(BaseModel):
|
||||
"""Parameters for the run_command tool."""
|
||||
@@ -43,6 +47,18 @@ class RunCommandTool(BaseTool):
|
||||
error=f"Command denied: matches blocked prefix '{denied}'",
|
||||
)
|
||||
|
||||
# Defense-in-depth: flag file-write redirects in tool result
|
||||
if _WRITE_REDIRECT_PATTERN.search(command):
|
||||
return ToolResult(
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=self.name,
|
||||
status=ToolResultStatus.ERROR,
|
||||
error=(
|
||||
f"Command contains file-write redirect (>, >>, or <<) "
|
||||
f"which bypasses file-write permissions. Use write_file instead."
|
||||
),
|
||||
)
|
||||
|
||||
# Allow check: first token must be in allowed_commands
|
||||
try:
|
||||
tokens = shlex.split(command)
|
||||
|
||||
101
app/ui/app.py
101
app/ui/app.py
@@ -10,18 +10,19 @@ from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.widgets import Header, Input, RichLog
|
||||
from textual.widgets import Input, RichLog
|
||||
from textual import work
|
||||
|
||||
from app.agent.context import SessionContext
|
||||
from app.agent.loop import AgentLoop
|
||||
from app.models.config import AppConfig
|
||||
from app.models.config import AgentMode, AppConfig
|
||||
from app.services.llm import LLMClient
|
||||
from app.services.permissions import PermissionsService
|
||||
from app.services.session import SessionManager
|
||||
from app.services.streaming import StreamHandler
|
||||
from app.tools.registry import create_default_registry
|
||||
from app.ui.widgets import (
|
||||
HeaderPanel,
|
||||
HistoryInput,
|
||||
PermissionModal,
|
||||
SessionResumeModal,
|
||||
@@ -45,6 +46,7 @@ class SneakyCodeApp(App):
|
||||
|
||||
BINDINGS = [
|
||||
Binding("ctrl+c", "cancel_or_quit", "Cancel/Quit", show=False),
|
||||
Binding("ctrl+p", "cycle_mode", "Cycle Mode"),
|
||||
]
|
||||
|
||||
def __init__(self, config: AppConfig, session_mgr: SessionManager | None = None) -> None:
|
||||
@@ -61,10 +63,9 @@ class SneakyCodeApp(App):
|
||||
self._skill_runner = None
|
||||
self._current_worker: Worker | None = None
|
||||
self._cancel_count = 0
|
||||
self.sub_title = config.llm.model
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
yield HeaderPanel(model_name=self._config.llm.model)
|
||||
yield RichLog(id="chat-log", highlight=True, markup=True)
|
||||
yield StreamingStatic("", id="streaming")
|
||||
yield StatusBar()
|
||||
@@ -74,6 +75,9 @@ class SneakyCodeApp(App):
|
||||
"""Initialize agent components after the app is mounted."""
|
||||
setup_logging_for_tui()
|
||||
|
||||
# Apply model profile for the initial model before creating context
|
||||
self._config.apply_model_profile(self._config.llm.model)
|
||||
|
||||
self._ctx = SessionContext(self._config)
|
||||
|
||||
# Create long-lived agent dependencies (reused across turns)
|
||||
@@ -96,11 +100,20 @@ class SneakyCodeApp(App):
|
||||
self._config.skills, self._config.agent.workspace_root
|
||||
)
|
||||
|
||||
# Create file cache if enabled
|
||||
self._file_cache = None
|
||||
fs_cache_cfg = self._config.tools.filesystem.cache
|
||||
if fs_cache_cfg.enabled:
|
||||
from app.utils.file_cache import FileCache
|
||||
|
||||
self._file_cache = FileCache(max_entries=fs_cache_cfg.max_entries)
|
||||
|
||||
# Create tool registry (SkillRunner wired after registry exists)
|
||||
self._tool_registry = create_default_registry(
|
||||
self._config.agent.workspace_root,
|
||||
self._config,
|
||||
skills_manager=self._skills_manager,
|
||||
file_cache=self._file_cache,
|
||||
)
|
||||
|
||||
# Create SkillRunner and late-bind it to skill tools
|
||||
@@ -156,6 +169,10 @@ class SneakyCodeApp(App):
|
||||
event.input.record(user_input)
|
||||
log = self.query_one("#chat-log", RichLog)
|
||||
|
||||
# Echo user prompt (condensed for multi-line)
|
||||
from app.utils.display import render_user_message
|
||||
log.write(render_user_message(user_input))
|
||||
|
||||
# Handle slash commands
|
||||
if user_input.startswith("/"):
|
||||
await self._handle_slash_command(user_input, log)
|
||||
@@ -184,8 +201,10 @@ class SneakyCodeApp(App):
|
||||
table.add_row("/history", "Show conversation history")
|
||||
table.add_row("/save", "Manually save session")
|
||||
table.add_row("/session", "Show session info (messages, tokens, start time)")
|
||||
table.add_row("/models", "List available Ollama models")
|
||||
table.add_row("/models <name>", "Switch to a different model")
|
||||
table.add_row("/models, /model", "List available Ollama models")
|
||||
table.add_row("/model <name>", "Switch to a different model")
|
||||
table.add_row("/mode", "Show current agent mode")
|
||||
table.add_row("/mode normal|plan|auto", "Switch agent mode")
|
||||
table.add_row("/skills", "List available skills")
|
||||
table.add_row("/<skill>", "Load a skill by name")
|
||||
log.write(table)
|
||||
@@ -215,7 +234,7 @@ class SneakyCodeApp(App):
|
||||
f"Started: {self._ctx.start_time.isoformat()}",
|
||||
style="cyan",
|
||||
))
|
||||
elif cmd.startswith("/models"):
|
||||
elif cmd.split()[0] in ("/models", "/model"):
|
||||
parts = command.split(maxsplit=1)
|
||||
if len(parts) == 1:
|
||||
# List available models
|
||||
@@ -239,8 +258,44 @@ class SneakyCodeApp(App):
|
||||
else:
|
||||
new_model = parts[1].strip()
|
||||
self._config.llm.model = new_model
|
||||
self.sub_title = new_model
|
||||
log.write(Text(f"Switched to model: {new_model}", style="bold green"))
|
||||
if self._session_mgr:
|
||||
self._session_mgr.update_model(new_model)
|
||||
# Apply model-specific profile overrides
|
||||
profile = self._config.apply_model_profile(new_model)
|
||||
if profile and self._ctx:
|
||||
# Update token budget if the profile overrides it
|
||||
self._ctx.token_counter.budget = self._config.agent.max_conversation_tokens
|
||||
self.query_one(HeaderPanel).update_model(new_model)
|
||||
header = self.query_one(HeaderPanel)
|
||||
header.update_tokens(
|
||||
self._ctx.estimated_tokens if self._ctx else 0,
|
||||
self._config.agent.max_conversation_tokens,
|
||||
)
|
||||
msg = f"Switched to model: {new_model}"
|
||||
if profile:
|
||||
overrides = []
|
||||
if profile.max_conversation_tokens is not None:
|
||||
overrides.append(f"tokens={profile.max_conversation_tokens:,}")
|
||||
if profile.thinking is not None:
|
||||
overrides.append(f"thinking={'on' if profile.thinking else 'off'}")
|
||||
if overrides:
|
||||
msg += f" ({', '.join(overrides)})"
|
||||
log.write(Text(msg, style="bold green"))
|
||||
elif cmd.split()[0] == "/mode":
|
||||
parts = command.split(maxsplit=1)
|
||||
if len(parts) == 1:
|
||||
current = self._permissions.mode
|
||||
log.write(Text(f"Current mode: {current.value}", style="cyan"))
|
||||
else:
|
||||
mode_str = parts[1].strip().lower()
|
||||
try:
|
||||
new_mode = AgentMode(mode_str)
|
||||
except ValueError:
|
||||
log.write(Text(f"Unknown mode: {mode_str}. Use normal, plan, or auto.", style="yellow"))
|
||||
return
|
||||
self._permissions.mode = new_mode
|
||||
self.query_one(HeaderPanel).update_mode(new_mode)
|
||||
log.write(Text(f"Switched to {new_mode.value} mode", style="bold green"))
|
||||
elif cmd == "/skills":
|
||||
if self._skills_manager:
|
||||
skills = self._skills_manager.list_skills()
|
||||
@@ -302,12 +357,19 @@ class SneakyCodeApp(App):
|
||||
status_bar.start_streaming()
|
||||
|
||||
# Set up streaming UI callbacks
|
||||
header = self.query_one(HeaderPanel)
|
||||
|
||||
def on_content(content: str) -> None:
|
||||
streaming_widget.update(
|
||||
Panel(Markdown(content), title="Assistant", border_style="green", expand=True)
|
||||
)
|
||||
streaming_widget.show_streaming()
|
||||
status_bar.update_stream_tokens(len(content) // 4)
|
||||
stream_tokens = len(content) // 4
|
||||
status_bar.update_stream_tokens(stream_tokens)
|
||||
header.update_tokens(
|
||||
self._ctx.estimated_tokens + stream_tokens,
|
||||
self._ctx.token_counter.budget,
|
||||
)
|
||||
|
||||
def on_thinking() -> None:
|
||||
streaming_widget.update(Text("Thinking...", style="dim"))
|
||||
@@ -331,6 +393,10 @@ class SneakyCodeApp(App):
|
||||
|
||||
status_bar.stop_streaming()
|
||||
|
||||
# Update token display in header
|
||||
header = self.query_one(HeaderPanel)
|
||||
header.update_tokens(self._ctx.estimated_tokens, self._ctx.token_counter.budget)
|
||||
|
||||
# Update skill indicator (skill may have been deactivated via finish_skill)
|
||||
if self._skill_runner and not self._skill_runner.is_active:
|
||||
status_bar.set_active_skill(None)
|
||||
@@ -356,6 +422,21 @@ class SneakyCodeApp(App):
|
||||
log = self.query_one("#chat-log", RichLog)
|
||||
log.write(Text("⚠ Cancelling... (press Ctrl+C again to quit)", style="yellow"))
|
||||
|
||||
def action_cycle_mode(self) -> None:
|
||||
"""Cycle through agent modes: Normal → Plan → Auto → Normal."""
|
||||
if self._permissions is None:
|
||||
return
|
||||
cycle = {
|
||||
AgentMode.NORMAL: AgentMode.PLAN,
|
||||
AgentMode.PLAN: AgentMode.AUTO,
|
||||
AgentMode.AUTO: AgentMode.NORMAL,
|
||||
}
|
||||
new_mode = cycle[self._permissions.mode]
|
||||
self._permissions.mode = new_mode
|
||||
self.query_one(HeaderPanel).update_mode(new_mode)
|
||||
log = self.query_one("#chat-log", RichLog)
|
||||
log.write(Text(f"Mode: {new_mode.value}", style="bold green"))
|
||||
|
||||
async def on_unmount(self) -> None:
|
||||
"""Clean up the LLM client on app shutdown."""
|
||||
if self._client is not None:
|
||||
|
||||
@@ -55,8 +55,8 @@ Screen {
|
||||
Input {
|
||||
dock: bottom;
|
||||
margin: 0;
|
||||
border: heavy darkcyan;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
Header {
|
||||
dock: top;
|
||||
}
|
||||
/* HeaderPanel styles are in DEFAULT_CSS on the widget itself */
|
||||
|
||||
@@ -11,6 +11,82 @@ from textual.widgets import Button, Input, Static
|
||||
|
||||
from rich.text import Text
|
||||
|
||||
from app.models.config import AgentMode
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Header Panel
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class HeaderPanel(Static):
|
||||
"""Single-line header showing model name, agent mode, and token usage."""
|
||||
|
||||
DEFAULT_CSS = """
|
||||
HeaderPanel {
|
||||
dock: top;
|
||||
height: 1;
|
||||
background: darkcyan;
|
||||
color: $text;
|
||||
padding: 0 2;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, model_name: str) -> None:
|
||||
super().__init__("")
|
||||
self._model_name = model_name
|
||||
self._mode: AgentMode = AgentMode.NORMAL
|
||||
self._tokens: int = 0
|
||||
self._budget: int = 0
|
||||
|
||||
def on_resize(self) -> None:
|
||||
self._refresh_display()
|
||||
|
||||
def update_model(self, name: str) -> None:
|
||||
"""Update the displayed model name."""
|
||||
self._model_name = name
|
||||
self._refresh_display()
|
||||
|
||||
def update_mode(self, mode: AgentMode) -> None:
|
||||
"""Update the displayed agent mode."""
|
||||
self._mode = mode
|
||||
self._refresh_display()
|
||||
|
||||
def update_tokens(self, tokens: int, budget: int) -> None:
|
||||
"""Update the token usage display."""
|
||||
self._tokens = tokens
|
||||
self._budget = budget
|
||||
self._refresh_display()
|
||||
|
||||
def _refresh_display(self) -> None:
|
||||
"""Rebuild the header text."""
|
||||
left = Text.assemble(
|
||||
("⚡ SneakyCode", "bold"),
|
||||
" │ ",
|
||||
(self._model_name, "bold"),
|
||||
)
|
||||
|
||||
mode_styles = {
|
||||
AgentMode.NORMAL: ("NORMAL", "bold black on white"),
|
||||
AgentMode.PLAN: ("PLAN", "bold black on yellow"),
|
||||
AgentMode.AUTO: ("AUTO", "bold white on red"),
|
||||
}
|
||||
mode_label, mode_style = mode_styles[self._mode]
|
||||
mode_text = Text.assemble((" ", mode_style), (mode_label, mode_style), (" ", mode_style))
|
||||
|
||||
right = Text(f"~{self._tokens:,} / {self._budget:,} tokens")
|
||||
|
||||
# Pad between sections
|
||||
total_content = left.plain + " " + mode_text.plain + " " + right.plain
|
||||
available = self.size.width if self.size.width > 0 else 80
|
||||
gap_left = max(1, (available - len(total_content)) // 2)
|
||||
gap_right = max(1, available - len(total_content) - gap_left)
|
||||
|
||||
full = Text.assemble(
|
||||
left, " " * gap_left, mode_text, " " * gap_right, right,
|
||||
)
|
||||
self.update(full)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Modal Dialogs
|
||||
@@ -139,8 +215,6 @@ class StatusBar(Static):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("")
|
||||
self._tokens: int = 0
|
||||
self._budget: int = 0
|
||||
self._iteration: int = 0
|
||||
self._max_iterations: int = 0
|
||||
self._streaming: bool = False
|
||||
@@ -149,12 +223,6 @@ class StatusBar(Static):
|
||||
self._stream_tokens: int = 0
|
||||
self._active_skill: str | None = None
|
||||
|
||||
def update_tokens(self, tokens: int, budget: int) -> None:
|
||||
"""Update the token usage display."""
|
||||
self._tokens = tokens
|
||||
self._budget = budget
|
||||
self._refresh_display()
|
||||
|
||||
def update_iteration(self, iteration: int, max_iterations: int) -> None:
|
||||
"""Update the iteration count display."""
|
||||
self._iteration = iteration
|
||||
@@ -200,8 +268,6 @@ class StatusBar(Static):
|
||||
parts.append(f"{spinner} Thinking")
|
||||
if self._stream_tokens > 0:
|
||||
parts.append(f"~{self._stream_tokens:,} tokens")
|
||||
if self._budget > 0:
|
||||
parts.append(f"Tokens: ~{self._tokens:,} / {self._budget:,}")
|
||||
if self._max_iterations > 0:
|
||||
parts.append(f"Iteration {self._iteration}/{self._max_iterations}")
|
||||
self.update(Text(" \u2502 ".join(parts), style="dim"))
|
||||
|
||||
@@ -44,9 +44,22 @@ if TYPE_CHECKING:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def render_user_message(content: str) -> Panel:
|
||||
"""Render a user message as a styled panel."""
|
||||
return Panel(content, title="You", border_style="cyan", expand=False)
|
||||
def render_user_message(content: str) -> Text:
|
||||
"""Render a condensed user prompt as a single styled line.
|
||||
|
||||
Multi-line input is collapsed to the first line with a line count suffix.
|
||||
Long single lines are truncated.
|
||||
"""
|
||||
lines = content.splitlines()
|
||||
first = lines[0] if lines else content
|
||||
max_len = 120
|
||||
if len(first) > max_len:
|
||||
first = first[:max_len] + "…"
|
||||
suffix = f" (+{len(lines) - 1} lines)" if len(lines) > 1 else ""
|
||||
text = Text()
|
||||
text.append("You: ", style="bold cyan")
|
||||
text.append(first + suffix, style="cyan")
|
||||
return text
|
||||
|
||||
|
||||
def render_assistant_message(content: str) -> Panel:
|
||||
@@ -223,8 +236,8 @@ def print_success(message: str) -> None:
|
||||
|
||||
|
||||
def print_user_message(content: str) -> None:
|
||||
"""Print a user message in a styled panel."""
|
||||
console.print(Panel(content, title="You", border_style="cyan", expand=False))
|
||||
"""Print a condensed user prompt line."""
|
||||
console.print(render_user_message(content))
|
||||
|
||||
|
||||
def print_assistant_message(content: str) -> None:
|
||||
|
||||
185
app/utils/file_cache.py
Normal file
185
app/utils/file_cache.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""File cache with LRU eviction and mtime-based invalidation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import OrderedDict
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from app.utils.file_helpers import (
|
||||
BinaryFileError,
|
||||
FileSizeError,
|
||||
PathSecurityError,
|
||||
check_file_size,
|
||||
is_binary_file,
|
||||
resolve_safe_path,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class CacheEntry:
|
||||
"""A cached file's content and modification timestamp."""
|
||||
|
||||
content: str
|
||||
mtime_ns: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class CacheStats:
|
||||
"""Running statistics for a FileCache instance."""
|
||||
|
||||
hits: int = 0
|
||||
misses: int = 0
|
||||
invalidations: int = 0
|
||||
evictions: int = 0
|
||||
|
||||
@property
|
||||
def hit_rate(self) -> float:
|
||||
"""Return cache hit rate as a float between 0.0 and 1.0."""
|
||||
total = self.hits + self.misses
|
||||
if total == 0:
|
||||
return 0.0
|
||||
return self.hits / total
|
||||
|
||||
|
||||
class FileCache:
|
||||
"""LRU file-content cache with mtime-based invalidation.
|
||||
|
||||
Keyed by resolved absolute ``Path``. Each lookup performs a cheap
|
||||
``stat()`` syscall to verify the file hasn't changed on disk — if the
|
||||
nanosecond mtime differs the entry is evicted and the caller gets a
|
||||
cache miss.
|
||||
|
||||
Not thread-safe (single-threaded agent loop).
|
||||
"""
|
||||
|
||||
def __init__(self, max_entries: int = 128) -> None:
|
||||
self._max_entries = max_entries
|
||||
self._entries: OrderedDict[Path, CacheEntry] = OrderedDict()
|
||||
self._stats = CacheStats()
|
||||
|
||||
# -- public API --------------------------------------------------
|
||||
|
||||
def get(self, path: Path) -> str | None:
|
||||
"""Return cached content if *path* hasn't changed, else ``None``.
|
||||
|
||||
A ``stat()`` call checks ``st_mtime_ns``; on mismatch the stale
|
||||
entry is silently removed.
|
||||
"""
|
||||
entry = self._entries.get(path)
|
||||
if entry is None:
|
||||
self._stats.misses += 1
|
||||
return None
|
||||
|
||||
try:
|
||||
current_mtime_ns = path.stat().st_mtime_ns
|
||||
except OSError:
|
||||
# File gone — evict and miss.
|
||||
self._remove(path)
|
||||
self._stats.misses += 1
|
||||
return None
|
||||
|
||||
if current_mtime_ns != entry.mtime_ns:
|
||||
self._remove(path)
|
||||
self._stats.invalidations += 1
|
||||
self._stats.misses += 1
|
||||
return None
|
||||
|
||||
# Cache hit — move to end (most-recently used).
|
||||
self._entries.move_to_end(path)
|
||||
self._stats.hits += 1
|
||||
return entry.content
|
||||
|
||||
def put(self, path: Path, content: str) -> None:
|
||||
"""Store *content* for *path* with its current ``st_mtime_ns``.
|
||||
|
||||
Evicts the least-recently-used entry when over capacity.
|
||||
"""
|
||||
try:
|
||||
mtime_ns = path.stat().st_mtime_ns
|
||||
except OSError:
|
||||
# Can't stat — don't cache.
|
||||
return
|
||||
|
||||
if path in self._entries:
|
||||
# Update existing; move to end.
|
||||
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
|
||||
self._entries.move_to_end(path)
|
||||
else:
|
||||
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
|
||||
|
||||
# Evict LRU if over capacity.
|
||||
while len(self._entries) > self._max_entries:
|
||||
self._entries.popitem(last=False)
|
||||
self._stats.evictions += 1
|
||||
|
||||
def invalidate(self, path: Path) -> None:
|
||||
"""Remove *path* from the cache if present."""
|
||||
if path in self._entries:
|
||||
del self._entries[path]
|
||||
self._stats.invalidations += 1
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all entries."""
|
||||
self._entries.clear()
|
||||
|
||||
@property
|
||||
def stats(self) -> CacheStats:
|
||||
"""Return the running cache statistics."""
|
||||
return self._stats
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._entries)
|
||||
|
||||
# -- internals ---------------------------------------------------
|
||||
|
||||
def _remove(self, path: Path) -> None:
|
||||
"""Delete an entry without bumping invalidation stats."""
|
||||
self._entries.pop(path, None)
|
||||
|
||||
|
||||
def cached_read_file(
|
||||
path: str | Path,
|
||||
workspace_root: Path,
|
||||
max_size_bytes: int = 1_048_576,
|
||||
check_binary: bool = True,
|
||||
cache: FileCache | None = None,
|
||||
) -> str:
|
||||
"""Read a file with full security checks, using *cache* when available.
|
||||
|
||||
Security checks (path sandboxing, size limit, binary detection) run on
|
||||
**every** call — only the ``Path.read_text()`` I/O is skipped on a cache
|
||||
hit.
|
||||
|
||||
When *cache* is ``None`` this behaves identically to
|
||||
:func:`~app.utils.file_helpers.safe_read_file`.
|
||||
|
||||
Raises:
|
||||
PathSecurityError: If the path escapes the workspace.
|
||||
FileSizeError: If the file is too large.
|
||||
BinaryFileError: If the file is binary and *check_binary* is True.
|
||||
FileNotFoundError: If the file does not exist.
|
||||
"""
|
||||
safe_path = resolve_safe_path(path, workspace_root)
|
||||
|
||||
if not safe_path.exists():
|
||||
raise FileNotFoundError(f"File not found: {safe_path}")
|
||||
|
||||
check_file_size(safe_path, max_size_bytes)
|
||||
|
||||
if check_binary and is_binary_file(safe_path):
|
||||
raise BinaryFileError(f"File appears to be binary: {safe_path}")
|
||||
|
||||
# Try cache.
|
||||
if cache is not None:
|
||||
cached = cache.get(safe_path)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
# Cache miss (or no cache) — read from disk.
|
||||
content = safe_path.read_text(encoding="utf-8")
|
||||
|
||||
if cache is not None:
|
||||
cache.put(safe_path, content)
|
||||
|
||||
return content
|
||||
@@ -36,6 +36,11 @@ class TokenCounter:
|
||||
"""The configured token budget."""
|
||||
return self._budget
|
||||
|
||||
@budget.setter
|
||||
def budget(self, value: int) -> None:
|
||||
"""Update the token budget (e.g., when switching models)."""
|
||||
self._budget = value
|
||||
|
||||
@property
|
||||
def cumulative_usage(self) -> TokenUsage:
|
||||
"""Cumulative token usage across all tracked calls."""
|
||||
|
||||
@@ -10,21 +10,32 @@ llm:
|
||||
max_retries: 3
|
||||
retry_backoff_base: 1.0
|
||||
retry_backoff_max: 30.0
|
||||
thinking: false # Disable model thinking/reasoning mode (reduces reasoning-only loops)
|
||||
# Extra parameters merged into the API request body (model-specific).
|
||||
# Examples:
|
||||
# Qwen 3.x: enable_thinking: false
|
||||
# DeepSeek: enable_thinking: false
|
||||
# OpenAI: reasoning_effort: "low"
|
||||
extra_body:
|
||||
enable_thinking: false
|
||||
extra_body: {}
|
||||
|
||||
agent:
|
||||
max_iterations: 25
|
||||
max_conversation_tokens: 32000
|
||||
max_conversation_tokens: 32000 # Default token budget (overridden by model_profiles)
|
||||
workspace_root: "."
|
||||
truncation_keep_recent: 10
|
||||
truncation_threshold: 0.85
|
||||
|
||||
# Per-model overrides — matched by longest model name prefix.
|
||||
# Unset fields fall through to the defaults above.
|
||||
model_profiles:
|
||||
llama3:
|
||||
max_conversation_tokens: 120000
|
||||
thinking: false
|
||||
qwen:
|
||||
max_conversation_tokens: 32000
|
||||
thinking: false
|
||||
qwq:
|
||||
max_conversation_tokens: 32000
|
||||
thinking: true
|
||||
|
||||
permissions:
|
||||
auto_approve:
|
||||
- read_file
|
||||
@@ -50,7 +61,6 @@ tools:
|
||||
- pytest
|
||||
- ruff
|
||||
- ls
|
||||
- cat
|
||||
- head
|
||||
- tail
|
||||
- wc
|
||||
@@ -58,6 +68,10 @@ tools:
|
||||
- grep
|
||||
- find
|
||||
- echo
|
||||
- which
|
||||
- jq
|
||||
- type
|
||||
- file
|
||||
denied_commands:
|
||||
- rm -rf /
|
||||
- sudo
|
||||
@@ -67,6 +81,9 @@ tools:
|
||||
filesystem:
|
||||
max_file_size_bytes: 1048576 # 1 MB
|
||||
binary_detection: true
|
||||
cache:
|
||||
enabled: true
|
||||
max_entries: 128
|
||||
|
||||
session:
|
||||
session_dir: ".sneakycode/sessions"
|
||||
|
||||
144
docs/ROADMAP.md
144
docs/ROADMAP.md
@@ -1,144 +0,0 @@
|
||||
# SneakyCode Implementation Roadmap
|
||||
|
||||
A phased plan progressing from bare-bones foundation to full autonomous coding agent.
|
||||
|
||||
---
|
||||
|
||||
## Phase 1 — Foundation: Models, Config, and Utilities
|
||||
|
||||
Establish the data layer and shared infrastructure everything else builds on.
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `app/models/config.py` | Pydantic v2 config model — load and validate `config/config.yaml` |
|
||||
| `app/models/message.py` | Message schema (role, content, tool_calls) |
|
||||
| `app/models/tool_call.py` | ToolCall and ToolResult schemas |
|
||||
| `app/utils/logging.py` | Centralized logger with Rich handler |
|
||||
| `app/utils/display.py` | Rich console output helpers (stub — expanded in Phase 2) |
|
||||
| `app/utils/file_helpers.py` | Safe path resolution, binary detection, size guards |
|
||||
| `app/utils/token_counter.py` | Approximate token usage tracking (character-based heuristic for v1) |
|
||||
| `app/main.py` | Entrypoint stub — arg parsing, config load, Rich console setup |
|
||||
|
||||
**Exit criteria:** `python -m app.main --help` runs, config loads and validates, models can be instantiated and serialized.
|
||||
|
||||
---
|
||||
|
||||
## Phase 2 — TUI and Interactive Shell
|
||||
|
||||
Get a working interactive terminal before wiring up the LLM.
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `app/main.py` | Rich-based interactive REPL loop — prompt for user input, display responses |
|
||||
| `app/utils/display.py` | Formatted output for agent messages, tool calls, errors, token usage |
|
||||
| `app/agent/context.py` | Session state and conversation history management |
|
||||
|
||||
**Exit criteria:** User can type messages into a styled REPL, see them echoed back with formatting, and conversation history is tracked in memory.
|
||||
|
||||
---
|
||||
|
||||
## Phase 3 — LLM Integration (Ollama)
|
||||
|
||||
Connect to the local LLM and stream responses into the TUI.
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `app/services/llm.py` | Async httpx client wrapping Ollama's OpenAI-compatible `/v1/chat/completions` endpoint |
|
||||
| `app/services/streaming.py` | SSE parsing, Rich live display, tool call extraction from accumulated stream |
|
||||
|
||||
**Integration:** Wire LLM into the REPL — user message goes to LLM, streamed response displays in real time.
|
||||
|
||||
**Exit criteria:** User can chat with the local model through the TUI with streamed output. Tool call JSON is parsed from the stream but not yet executed.
|
||||
|
||||
---
|
||||
|
||||
## Phase 4 — Tool Framework and Core Tools
|
||||
|
||||
Build the tool abstraction and implement safe, read-only tools first.
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `app/tools/base.py` | `BaseTool` ABC and `ToolResult` dataclass |
|
||||
| `app/tools/registry.py` | Tool registration, discovery, and JSON schema export for LLM system prompt |
|
||||
| `app/services/permissions.py` | Two-tier approval gating (auto-approve reads; prompt for writes/deletes/shell) |
|
||||
| `app/tools/filesystem.py` | `read_file`, `list_dir` |
|
||||
| `app/tools/search.py` | `grep_files`, `find_files` |
|
||||
|
||||
**Exit criteria:** Tools register themselves, schemas export correctly for inclusion in the system prompt, read-only tools execute and return `ToolResult` objects. Permissions service gates execution.
|
||||
|
||||
---
|
||||
|
||||
## Phase 5 — Agent Loop (ReAct)
|
||||
|
||||
The core autonomy layer — reason, act, observe, repeat.
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `app/agent/loop.py` | ReAct cycle: send conversation to LLM, parse tool calls, execute, feed results back, repeat |
|
||||
|
||||
**Key behaviors:**
|
||||
- System prompt constructed with tool schemas from registry
|
||||
- Permissions checks before each tool execution
|
||||
- Loop termination on: plain-text response (no tool calls), explicit `finish` tool call, or `max_iterations` exceeded
|
||||
|
||||
**Exit criteria:** Agent can autonomously answer questions about the codebase by chaining `read_file`, `list_dir`, `grep_files`, and `find_files` tool calls in a multi-turn loop.
|
||||
|
||||
---
|
||||
|
||||
## Phase 6 — Write Tools and Shell
|
||||
|
||||
Unlock the agent's ability to modify code and run commands.
|
||||
|
||||
| File | Description |
|
||||
|------|-------------|
|
||||
| `app/tools/filesystem.py` | `write_file`, `make_dir`, `delete_file` (additions to existing module) |
|
||||
| `app/tools/edit.py` | `str_replace` (unique-match required), `patch_apply` |
|
||||
| `app/tools/shell.py` | `run_command` with command allow/deny lists and output truncation |
|
||||
|
||||
**All write/shell operations gated through permissions service.**
|
||||
|
||||
**Exit criteria:** Agent can autonomously create files, edit code via string replacement, and run shell commands — all with user approval for destructive operations.
|
||||
|
||||
---
|
||||
|
||||
## Phase 7 — Polish and Hardening
|
||||
|
||||
Production-readiness: error handling, resource limits, and documentation.
|
||||
|
||||
| Area | Description |
|
||||
|------|-------------|
|
||||
| Error handling | Recovery from malformed tool calls, LLM errors, network timeouts in agent loop |
|
||||
| Token budget | Conversation truncation or summarization when approaching context limit |
|
||||
| Graceful shutdown | Clean Ctrl+C handling, session state preservation |
|
||||
| Testing | End-to-end integration tests (`tests/integration/`), unit tests (`tests/unit/`) |
|
||||
| Documentation | `README.md` with setup and usage instructions, `docs/tools.md` tool reference |
|
||||
|
||||
**Exit criteria:** Agent handles edge cases gracefully, tests pass, and a new user can set up and use the project from the README alone.
|
||||
|
||||
---
|
||||
|
||||
## File Coverage
|
||||
|
||||
Every file from the project structure in CLAUDE.md is accounted for:
|
||||
|
||||
| File | Phase |
|
||||
|------|-------|
|
||||
| `app/main.py` | 1, 2 |
|
||||
| `app/models/config.py` | 1 |
|
||||
| `app/models/message.py` | 1 |
|
||||
| `app/models/tool_call.py` | 1 |
|
||||
| `app/utils/logging.py` | 1 |
|
||||
| `app/utils/display.py` | 1, 2 |
|
||||
| `app/utils/file_helpers.py` | 1 |
|
||||
| `app/utils/token_counter.py` | 1 |
|
||||
| `app/agent/context.py` | 2 |
|
||||
| `app/services/llm.py` | 3 |
|
||||
| `app/services/streaming.py` | 3 |
|
||||
| `app/tools/base.py` | 4 |
|
||||
| `app/tools/registry.py` | 4 |
|
||||
| `app/services/permissions.py` | 4 |
|
||||
| `app/tools/filesystem.py` | 4, 6 |
|
||||
| `app/tools/search.py` | 4 |
|
||||
| `app/agent/loop.py` | 5 |
|
||||
| `app/tools/edit.py` | 6 |
|
||||
| `app/tools/shell.py` | 6 |
|
||||
1802
docs/superpowers/plans/2026-03-11-textual-tui.md
Normal file
1802
docs/superpowers/plans/2026-03-11-textual-tui.md
Normal file
File diff suppressed because it is too large
Load Diff
192
docs/superpowers/specs/2026-03-11-textual-tui-design.md
Normal file
192
docs/superpowers/specs/2026-03-11-textual-tui-design.md
Normal file
@@ -0,0 +1,192 @@
|
||||
# Textual TUI Redesign — Design Spec
|
||||
|
||||
## Overview
|
||||
|
||||
Replace the current sequential print-and-scroll terminal UI with a full persistent split-screen TUI using Textual. Input is pinned at the bottom, scrollable message history above, with a header showing app/model info and a footer showing token usage and iteration count.
|
||||
|
||||
## Layout
|
||||
|
||||
```
|
||||
+------------------- Header --------------------+
|
||||
| SneakyCode qwen2.5-coder:32b |
|
||||
+-----------------------------------------------+
|
||||
| |
|
||||
| +--- You ---+ |
|
||||
| | prompt | <- RichLog widget |
|
||||
| +-----------+ (handles own scrolling) |
|
||||
| |
|
||||
| Thinking... |
|
||||
| |
|
||||
| +-- Assistant --+ |
|
||||
| | response... | |
|
||||
| +---------------+ |
|
||||
| |
|
||||
| > read_file README.md -- 148 lines, 5128 ch |
|
||||
| > grep_files "pattern" -- 3 matches |
|
||||
| |
|
||||
+-----------------------------------------------+
|
||||
| Tokens: ~1,511 / 32,000 | Iteration 5/25 | <- StatusBar
|
||||
+-----------------------------------------------+
|
||||
| > [input cursor] | <- Input widget
|
||||
+-----------------------------------------------+
|
||||
```
|
||||
|
||||
**Widget hierarchy (no VerticalScroll wrapper — RichLog handles its own scrolling):**
|
||||
- `Header` — Textual built-in, title="SneakyCode", subtitle=model name
|
||||
- `RichLog` (id="chat-log") — main scroll area, accepts Rich renderables via `.write()`
|
||||
- `StreamingStatic` — persistent hidden `Static` widget, shown/hidden during streaming (avoids mount/unmount overhead)
|
||||
- `StatusBar` — custom `Static` widget, 1 row, docked above Input
|
||||
- `Input` — Textual built-in, pinned at bottom
|
||||
|
||||
## New Files
|
||||
|
||||
### `app/ui/app.py` — Textual App
|
||||
|
||||
SneakyCodeApp subclasses `textual.app.App`. Responsibilities:
|
||||
|
||||
- `compose()` yields: Header, RichLog(id="chat-log"), StreamingStatic(id="streaming"), StatusBar(id="status"), Input
|
||||
- `on_input_submitted()` handler: reads input value, clears input, writes user panel to chat log, dispatches agent turn as a worker
|
||||
- Agent turn runs via `run_worker()` (async worker, NOT threaded) so the UI stays responsive. Since the worker is async and on the event loop, widget methods can be called directly — no `call_from_thread()` needed.
|
||||
- Slash commands (/quit, /history, /clear, /save, /session) parsed from input before dispatching to agent
|
||||
- Holds references to config, SessionContext, AgentLoop (created in `on_mount`)
|
||||
- Header subtitle set to model name from config
|
||||
- `on_worker_state_changed()` handler: catches worker errors and writes error panels to RichLog
|
||||
- Ctrl+C binding: cancels the running agent worker (does NOT quit the app). A second Ctrl+C or `/quit` exits.
|
||||
|
||||
### `app/ui/widgets.py` — Custom Widgets
|
||||
|
||||
**StatusBar** — A simple `Static` widget styled as a footer bar. Displays token usage and iteration count. Updated by the agent loop after each LLM step via `status_bar.update(renderable)`.
|
||||
|
||||
**StreamingStatic** — A `Static` widget that stays mounted but hidden. During streaming, it becomes visible and receives `update()` calls with partial content. When streaming ends, it is hidden and its content is cleared. This avoids the overhead of mounting/unmounting on every LLM response.
|
||||
|
||||
### `app/ui/styles.tcss` — Textual CSS
|
||||
|
||||
Layout rules:
|
||||
- RichLog fills available height (fraction-based sizing, e.g. `height: 1fr`)
|
||||
- StreamingStatic: `display: none` by default, shown during streaming
|
||||
- StatusBar is 1 row, docked bottom above Input
|
||||
- Input is 1 row, docked at very bottom
|
||||
- Color scheme matches existing SNEAKYCODE_THEME (cyan for user, green for assistant, magenta for tools, dim for metadata)
|
||||
|
||||
## Modified Files
|
||||
|
||||
### `app/main.py`
|
||||
|
||||
- Remove `_run_repl()` async function entirely
|
||||
- Remove `console.input()` usage
|
||||
- `main()` creates config, runs preflight via `asyncio.run(_preflight(config))` (before Textual starts — this is fine, separate event loop), then instantiates and runs `SneakyCodeApp(config).run()`
|
||||
- CLI arg parsing stays (--config, -v, --log-file)
|
||||
- Session resume: `_offer_session_resume()` moves into `SneakyCodeApp.on_mount()` — instead of `console.input()`, push a modal screen asking "Resume previous session? [y/n]" with button/key handlers
|
||||
- Auto-save: triggers after each agent turn completes (in the worker completion handler)
|
||||
- SIGTERM handler: removed — Textual manages its own signal handling and shutdown lifecycle
|
||||
|
||||
### `app/services/streaming.py`
|
||||
|
||||
- Remove `from rich.live import Live` and `from rich.spinner import Spinner`
|
||||
- `process_stream()` no longer creates a `Rich.Live` context
|
||||
- Instead, accepts callback parameters:
|
||||
- `on_content: Callable[[str], None]` — called with accumulated content on each content chunk
|
||||
- `on_thinking: Callable[[], None]` — called once when first reasoning token arrives
|
||||
- `on_done: Callable[[], None]` — called when streaming completes
|
||||
- **Throttling:** Content callback fires at most every 100ms (track last update time, skip intermediate chunks). Final content always fires on stream end.
|
||||
- Since the agent runs as an async worker (on the event loop), callbacks can directly call widget methods — no `call_from_thread()` needed.
|
||||
- All accumulation and tool-call parsing logic stays identical
|
||||
|
||||
### `app/utils/display.py`
|
||||
|
||||
- All `print_*` functions become `render_*` functions that return Rich renderables:
|
||||
- `render_user_message(content) -> Panel`
|
||||
- `render_assistant_message(content) -> Panel`
|
||||
- `render_tool_call(name, args) -> Text`
|
||||
- `render_tool_result(name, output, is_error) -> Text`
|
||||
- `render_iteration_header(iteration, max_iter) -> Text`
|
||||
- `render_warning(message) -> Text`
|
||||
- `render_error(message) -> Text`
|
||||
- `print_banner()` removed — Header widget replaces it
|
||||
- `print_token_usage()` becomes `render_token_usage() -> Text` for the StatusBar
|
||||
- `print_history()` becomes `render_history() -> Table` — written to RichLog, may need width constraints for narrow terminals
|
||||
- A `DisplayAdapter` class wraps a `RichLog` reference and provides `write_user_message()`, `write_tool_call()`, etc. methods that call `render_*` then `rich_log.write()`
|
||||
|
||||
### `app/agent/loop.py`
|
||||
|
||||
- `AgentLoop.__init__()` accepts a `DisplayAdapter` instead of calling `display.py` print functions directly
|
||||
- All display calls route through the adapter: `self._display.write_tool_call(name, args)`, `self._display.write_iteration_header(i, max)`, etc.
|
||||
- `_execute_tool_calls()` becomes `async def _execute_tool_calls()` to support async permission checks
|
||||
- The loop logic (ReAct pattern, retry, truncation) is unchanged
|
||||
|
||||
### `app/services/permissions.py`
|
||||
|
||||
- `PermissionsService.check()` becomes `async def check()`
|
||||
- Instead of `rich.prompt.Confirm.ask()` (blocking stdin read), it:
|
||||
1. Creates an `asyncio.Event`
|
||||
2. Posts a custom message to the app requesting a permission modal
|
||||
3. The app pushes a modal screen with the permission question and approve/deny buttons
|
||||
4. When the user responds, the modal sets the event and stores the result
|
||||
5. `check()` awaits the event and reads the result
|
||||
- Edge cases: dismiss without choosing = deny. Ctrl+C during modal = deny. Focus returns to Input after modal dismisses.
|
||||
|
||||
### `app/utils/logging.py`
|
||||
|
||||
- **Critical change:** The shared `console = Console()` instance will corrupt the Textual display since Textual takes exclusive terminal control
|
||||
- When running under Textual: disable `RichHandler` (console handler), keep only the file handler
|
||||
- Add a `setup_logging_for_tui()` function that reconfigures logging to file-only mode
|
||||
- Called from `SneakyCodeApp.on_mount()` before any agent work begins
|
||||
- The `console` object still exists but should not be used for output during TUI mode — all output goes through the DisplayAdapter
|
||||
- Consider: `--log-file` becomes required (or auto-set to a default) when running in TUI mode, so logs are not lost
|
||||
|
||||
## Unchanged Files
|
||||
|
||||
- `app/services/llm.py` — HTTP client, SSE parsing untouched
|
||||
- `app/agent/context.py` — session state untouched
|
||||
- `app/models/*` — all data models untouched
|
||||
- `app/tools/*` — all tool implementations untouched
|
||||
- `app/utils/file_helpers.py` — path safety untouched
|
||||
- `app/utils/token_counter.py` — token counting untouched
|
||||
|
||||
## Key Patterns
|
||||
|
||||
### Streaming in Textual
|
||||
|
||||
The agent loop runs as an async worker (on the event loop, NOT threaded). During streaming:
|
||||
|
||||
1. App shows `StreamingStatic` widget, writes "Thinking..." initially
|
||||
2. Worker calls `StreamHandler.process_stream(chunks, on_content=..., on_thinking=..., on_done=...)`
|
||||
3. `on_content` callback: updates `StreamingStatic` with `Panel(Markdown(partial_content), title="Assistant", border_style="green")` — throttled to ~100ms intervals
|
||||
4. `on_done` callback: hides `StreamingStatic`, writes final content to `RichLog` via `DisplayAdapter`
|
||||
|
||||
Since the worker is async (not threaded), callbacks run on the event loop and can call widget methods directly.
|
||||
|
||||
### Permission Prompts
|
||||
|
||||
1. Agent loop (in async worker) calls `await permissions.check(operation, details)`
|
||||
2. `check()` creates an `asyncio.Event` and posts `PermissionRequest` message to the app
|
||||
3. App handles `PermissionRequest`: pushes a modal screen with the question, approve/deny buttons
|
||||
4. Modal screen: on button press, stores result and sets the event
|
||||
5. `check()` awaits the event, reads result, returns approved/denied
|
||||
6. Focus management: Input loses focus when modal appears, regains focus when modal dismisses
|
||||
7. Default on dismiss/Ctrl+C: deny
|
||||
|
||||
### Cancellation
|
||||
|
||||
- Ctrl+C (first press): cancels the running agent worker via `worker.cancel()`. The agent loop should check for cancellation between iterations.
|
||||
- Ctrl+C (second press) or `/quit`: exits the app via `app.exit()`
|
||||
|
||||
## Dependencies
|
||||
|
||||
- Add `textual>=4.0.0` to pyproject.toml dependencies
|
||||
|
||||
## Verification
|
||||
|
||||
1. Run the app — header shows app name + model, no console corruption
|
||||
2. Type a prompt — user panel appears in scroll area, input clears
|
||||
3. During LLM streaming — assistant response types out live (throttled) in the scroll area
|
||||
4. Thinking indicator shows during reasoning-only phases
|
||||
5. Tool calls appear as compact lines in the scroll area
|
||||
6. Footer shows token usage and iteration count, updating each step
|
||||
7. Scroll area auto-scrolls to bottom on new content
|
||||
8. /quit, /clear, /history commands work from the input
|
||||
9. Permission prompts show as modal, approve/deny work, focus returns to input
|
||||
10. Ctrl+C cancels running agent turn without quitting
|
||||
11. Worker errors display as error panels in the scroll area
|
||||
12. Logging goes to file only — no console corruption
|
||||
13. Session resume works on startup via modal dialog
|
||||
@@ -1 +1,12 @@
|
||||
Pressing up should cycle history like claude code.
|
||||
# UI Issues
|
||||
on /clear we need to reset the token counter in the header panel.
|
||||
|
||||
# Bugs
|
||||
|
||||
# Improvements
|
||||
add -p to command line args so that the agent can run the prompt and return data directly via STDOUT
|
||||
|
||||
# Open questions:
|
||||
How might we pass a directory to this app and have it use that directory as it's workspace so I don't have to copy files or do odd things to work in other directories.
|
||||
|
||||
How do we handle huge files not taking up so many tokens?
|
||||
@@ -21,10 +21,10 @@ from app.utils.display import (
|
||||
|
||||
|
||||
class TestRenderFunctions:
|
||||
def test_render_user_message_returns_panel(self) -> None:
|
||||
def test_render_user_message_returns_text(self) -> None:
|
||||
result = render_user_message("hello")
|
||||
assert isinstance(result, Panel)
|
||||
assert result.title == "You"
|
||||
assert isinstance(result, Text)
|
||||
assert "hello" in result.plain
|
||||
|
||||
def test_render_assistant_message_returns_panel(self) -> None:
|
||||
result = render_assistant_message("response")
|
||||
@@ -72,7 +72,7 @@ class TestDisplayAdapter:
|
||||
adapter.write_user_message("hello")
|
||||
mock_log.write.assert_called_once()
|
||||
arg = mock_log.write.call_args[0][0]
|
||||
assert isinstance(arg, Panel)
|
||||
assert isinstance(arg, Text)
|
||||
|
||||
def test_write_tool_call(self) -> None:
|
||||
mock_log = MagicMock()
|
||||
|
||||
314
tests/unit/test_file_cache.py
Normal file
314
tests/unit/test_file_cache.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""Tests for the file cache with LRU eviction and mtime invalidation."""
|
||||
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from app.models.config import AppConfig, load_config
|
||||
from app.models.tool_call import ToolResultStatus
|
||||
from app.tools.filesystem import ReadFileTool, ReadManyFilesTool
|
||||
from app.utils.file_cache import CacheStats, FileCache, cached_read_file
|
||||
from app.utils.file_helpers import BinaryFileError, FileSizeError, PathSecurityError
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FileCache unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFileCache:
|
||||
def test_put_and_get_roundtrip(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "hello.txt"
|
||||
f.write_text("hello world")
|
||||
|
||||
cache.put(f, "hello world")
|
||||
assert cache.get(f) == "hello world"
|
||||
|
||||
def test_get_returns_none_for_missing_key(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
assert cache.get(tmp_path / "nope.txt") is None
|
||||
|
||||
def test_mtime_change_causes_miss(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "data.txt"
|
||||
f.write_text("v1")
|
||||
cache.put(f, "v1")
|
||||
|
||||
# Mutate the file so mtime changes
|
||||
time.sleep(0.05) # ensure mtime differs
|
||||
f.write_text("v2")
|
||||
|
||||
assert cache.get(f) is None # stale → miss
|
||||
assert cache.stats.invalidations == 1
|
||||
|
||||
def test_lru_eviction_at_capacity(self, tmp_path: Path) -> None:
|
||||
cache = FileCache(max_entries=3)
|
||||
files = []
|
||||
for i in range(4):
|
||||
f = tmp_path / f"f{i}.txt"
|
||||
f.write_text(f"content-{i}")
|
||||
files.append(f)
|
||||
|
||||
# Fill cache to capacity
|
||||
for f in files[:3]:
|
||||
cache.put(f, f.read_text())
|
||||
assert len(cache) == 3
|
||||
|
||||
# Adding a 4th evicts the LRU (files[0])
|
||||
cache.put(files[3], files[3].read_text())
|
||||
assert len(cache) == 3
|
||||
assert cache.get(files[0]) is None # evicted
|
||||
assert cache.stats.evictions == 1
|
||||
|
||||
# files[1..3] still present
|
||||
for f in files[1:]:
|
||||
assert cache.get(f) is not None
|
||||
|
||||
def test_invalidate_removes_entry(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "rm.txt"
|
||||
f.write_text("bye")
|
||||
cache.put(f, "bye")
|
||||
assert len(cache) == 1
|
||||
|
||||
cache.invalidate(f)
|
||||
assert len(cache) == 0
|
||||
assert cache.get(f) is None
|
||||
assert cache.stats.invalidations == 1
|
||||
|
||||
def test_invalidate_noop_for_missing(self) -> None:
|
||||
cache = FileCache()
|
||||
cache.invalidate(Path("/nonexistent"))
|
||||
assert cache.stats.invalidations == 0
|
||||
|
||||
def test_clear_empties_cache(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
for i in range(5):
|
||||
f = tmp_path / f"c{i}.txt"
|
||||
f.write_text(str(i))
|
||||
cache.put(f, str(i))
|
||||
assert len(cache) == 5
|
||||
|
||||
cache.clear()
|
||||
assert len(cache) == 0
|
||||
|
||||
def test_stats_accuracy(self, tmp_path: Path) -> None:
|
||||
cache = FileCache(max_entries=2)
|
||||
a = tmp_path / "a.txt"
|
||||
b = tmp_path / "b.txt"
|
||||
c = tmp_path / "c.txt"
|
||||
a.write_text("a")
|
||||
b.write_text("b")
|
||||
c.write_text("c")
|
||||
|
||||
# Miss
|
||||
cache.get(a)
|
||||
assert cache.stats.misses == 1
|
||||
assert cache.stats.hits == 0
|
||||
|
||||
# Put + hit
|
||||
cache.put(a, "a")
|
||||
cache.get(a)
|
||||
assert cache.stats.hits == 1
|
||||
|
||||
# Fill + evict
|
||||
cache.put(b, "b")
|
||||
cache.put(c, "c") # evicts a
|
||||
assert cache.stats.evictions == 1
|
||||
|
||||
def test_hit_rate(self) -> None:
|
||||
stats = CacheStats(hits=3, misses=1)
|
||||
assert stats.hit_rate == pytest.approx(0.75)
|
||||
|
||||
def test_hit_rate_zero_total(self) -> None:
|
||||
stats = CacheStats()
|
||||
assert stats.hit_rate == 0.0
|
||||
|
||||
def test_file_deleted_after_caching(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "gone.txt"
|
||||
f.write_text("here")
|
||||
cache.put(f, "here")
|
||||
|
||||
f.unlink()
|
||||
assert cache.get(f) is None # stat fails → miss
|
||||
|
||||
def test_put_skips_when_stat_fails(self) -> None:
|
||||
cache = FileCache()
|
||||
cache.put(Path("/totally/nonexistent"), "data")
|
||||
assert len(cache) == 0
|
||||
|
||||
def test_get_moves_to_end(self, tmp_path: Path) -> None:
|
||||
"""Accessing an entry makes it most-recently-used, protecting from eviction."""
|
||||
cache = FileCache(max_entries=3)
|
||||
files = []
|
||||
for i in range(3):
|
||||
f = tmp_path / f"lru{i}.txt"
|
||||
f.write_text(f"c{i}")
|
||||
files.append(f)
|
||||
cache.put(f, f"c{i}")
|
||||
|
||||
# Touch files[0] to make it MRU
|
||||
cache.get(files[0])
|
||||
|
||||
# Add a new entry — files[1] (LRU) should be evicted, not files[0]
|
||||
extra = tmp_path / "extra.txt"
|
||||
extra.write_text("x")
|
||||
cache.put(extra, "x")
|
||||
|
||||
assert cache.get(files[0]) is not None # protected by access
|
||||
assert cache.get(files[1]) is None # evicted
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# cached_read_file tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCachedReadFile:
|
||||
def test_without_cache_matches_safe_read(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "plain.txt"
|
||||
f.write_text("hello")
|
||||
content = cached_read_file(f, tmp_path, cache=None)
|
||||
assert content == "hello"
|
||||
|
||||
def test_populates_on_miss_returns_on_hit(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "cached.txt"
|
||||
f.write_text("data")
|
||||
|
||||
# First call: miss → read from disk → populate cache
|
||||
content1 = cached_read_file(f, tmp_path, cache=cache)
|
||||
assert content1 == "data"
|
||||
assert cache.stats.misses == 1
|
||||
assert cache.stats.hits == 0
|
||||
|
||||
# Second call: hit → from cache
|
||||
content2 = cached_read_file(f, tmp_path, cache=cache)
|
||||
assert content2 == "data"
|
||||
assert cache.stats.hits == 1
|
||||
|
||||
def test_security_checks_run_on_cached_path(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
with pytest.raises(PathSecurityError):
|
||||
cached_read_file("/etc/passwd", tmp_path, cache=cache)
|
||||
|
||||
def test_binary_check_runs_on_cached_path(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "bin.dat"
|
||||
f.write_bytes(b"\x00binary\x00")
|
||||
with pytest.raises(BinaryFileError):
|
||||
cached_read_file(f, tmp_path, cache=cache)
|
||||
|
||||
def test_size_check_runs_on_cached_path(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
f = tmp_path / "big.txt"
|
||||
f.write_text("x" * 200)
|
||||
|
||||
# First read populates cache
|
||||
cached_read_file(f, tmp_path, max_size_bytes=1000, cache=cache)
|
||||
|
||||
# Now make file too big on disk — security check should catch it
|
||||
# even though content is cached
|
||||
f.write_text("x" * 2000)
|
||||
with pytest.raises(FileSizeError):
|
||||
cached_read_file(f, tmp_path, max_size_bytes=1000, cache=cache)
|
||||
|
||||
def test_file_not_found(self, tmp_path: Path) -> None:
|
||||
cache = FileCache()
|
||||
with pytest.raises(FileNotFoundError):
|
||||
cached_read_file(tmp_path / "nope.txt", tmp_path, cache=cache)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool-level cache-hit dedup tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config() -> AppConfig:
|
||||
return load_config()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
|
||||
config.agent.workspace_root = tmp_path
|
||||
return tmp_path, config
|
||||
|
||||
|
||||
class TestReadFileToolCacheHit:
|
||||
def test_first_read_returns_full_content(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
cache = FileCache()
|
||||
(ws / "hello.txt").write_text("hello world")
|
||||
|
||||
tool = ReadFileTool(ws, cfg, file_cache=cache)
|
||||
result = tool.run("tc-1", {"file_path": "hello.txt"})
|
||||
assert result.status == ToolResultStatus.SUCCESS
|
||||
assert result.output == "hello world"
|
||||
|
||||
def test_second_read_returns_cached_message(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
cache = FileCache()
|
||||
(ws / "hello.txt").write_text("hello world")
|
||||
|
||||
tool = ReadFileTool(ws, cfg, file_cache=cache)
|
||||
tool.run("tc-1", {"file_path": "hello.txt"})
|
||||
|
||||
result2 = tool.run("tc-2", {"file_path": "hello.txt"})
|
||||
assert result2.status == ToolResultStatus.SUCCESS
|
||||
assert "[Cached]" in result2.output
|
||||
assert "hello.txt" in result2.output
|
||||
assert "hello world" not in result2.output
|
||||
|
||||
def test_changed_file_returns_full_content_again(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
cache = FileCache()
|
||||
f = ws / "data.txt"
|
||||
f.write_text("v1")
|
||||
|
||||
tool = ReadFileTool(ws, cfg, file_cache=cache)
|
||||
tool.run("tc-1", {"file_path": "data.txt"})
|
||||
|
||||
# Mutate file so mtime changes
|
||||
time.sleep(0.05)
|
||||
f.write_text("v2")
|
||||
|
||||
result2 = tool.run("tc-2", {"file_path": "data.txt"})
|
||||
assert result2.status == ToolResultStatus.SUCCESS
|
||||
assert result2.output == "v2"
|
||||
assert "[Cached]" not in result2.output
|
||||
|
||||
def test_no_cache_always_returns_content(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
(ws / "hello.txt").write_text("hello")
|
||||
|
||||
tool = ReadFileTool(ws, cfg, file_cache=None)
|
||||
r1 = tool.run("tc-1", {"file_path": "hello.txt"})
|
||||
r2 = tool.run("tc-2", {"file_path": "hello.txt"})
|
||||
assert r1.output == "hello"
|
||||
assert r2.output == "hello"
|
||||
|
||||
|
||||
class TestReadManyFilesToolCacheHit:
|
||||
def test_cached_files_get_short_message(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
cache = FileCache()
|
||||
(ws / "a.txt").write_text("alpha")
|
||||
(ws / "b.txt").write_text("bravo")
|
||||
|
||||
tool = ReadManyFilesTool(ws, cfg, file_cache=cache)
|
||||
|
||||
# First read — full content
|
||||
r1 = tool.run("tc-1", {"file_paths": ["a.txt", "b.txt"]})
|
||||
assert "alpha" in r1.output
|
||||
assert "bravo" in r1.output
|
||||
|
||||
# Second read — cached messages
|
||||
r2 = tool.run("tc-2", {"file_paths": ["a.txt", "b.txt"]})
|
||||
assert "[Cached]" in r2.output
|
||||
assert "alpha" not in r2.output
|
||||
assert "bravo" not in r2.output
|
||||
69
tests/unit/test_filesystem_read_many.py
Normal file
69
tests/unit/test_filesystem_read_many.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Tests for the read_many_files tool."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from app.models.config import AppConfig, load_config
|
||||
from app.models.tool_call import ToolResultStatus
|
||||
from app.tools.filesystem import ReadManyFilesTool
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config() -> AppConfig:
|
||||
return load_config()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
|
||||
"""Create a temporary workspace for read_many_files tests."""
|
||||
config.agent.workspace_root = tmp_path
|
||||
return tmp_path, config
|
||||
|
||||
|
||||
class TestReadManyFilesTool:
|
||||
def test_read_multiple_files(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
(ws / "a.txt").write_text("alpha")
|
||||
(ws / "b.txt").write_text("bravo")
|
||||
tool = ReadManyFilesTool(ws, cfg)
|
||||
result = tool.run("tc-1", {"file_paths": ["a.txt", "b.txt"]})
|
||||
assert result.status == ToolResultStatus.SUCCESS
|
||||
assert "=== a.txt ===" in result.output
|
||||
assert "alpha" in result.output
|
||||
assert "=== b.txt ===" in result.output
|
||||
assert "bravo" in result.output
|
||||
|
||||
def test_partial_failure(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
(ws / "exists.txt").write_text("hello")
|
||||
tool = ReadManyFilesTool(ws, cfg)
|
||||
result = tool.run("tc-2", {"file_paths": ["exists.txt", "missing.txt"]})
|
||||
assert result.status == ToolResultStatus.SUCCESS
|
||||
assert "hello" in result.output
|
||||
assert "[ERROR]" in result.output
|
||||
assert "=== missing.txt ===" in result.output
|
||||
|
||||
def test_all_files_fail(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
tool = ReadManyFilesTool(ws, cfg)
|
||||
result = tool.run("tc-3", {"file_paths": ["no1.txt", "no2.txt"]})
|
||||
assert result.status == ToolResultStatus.ERROR
|
||||
assert "All files failed" in (result.error or "")
|
||||
|
||||
def test_empty_file_paths(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
tool = ReadManyFilesTool(ws, cfg)
|
||||
result = tool.run("tc-4", {"file_paths": []})
|
||||
assert result.status == ToolResultStatus.ERROR
|
||||
assert "empty" in (result.error or "").lower()
|
||||
|
||||
def test_path_security_inline_error(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
||||
ws, cfg = tmp_workspace
|
||||
(ws / "safe.txt").write_text("ok")
|
||||
tool = ReadManyFilesTool(ws, cfg)
|
||||
result = tool.run("tc-5", {"file_paths": ["safe.txt", "../../etc/passwd"]})
|
||||
assert result.status == ToolResultStatus.SUCCESS
|
||||
assert "ok" in result.output
|
||||
assert "[ERROR]" in result.output
|
||||
assert "outside" in result.output.lower()
|
||||
@@ -90,6 +90,6 @@ class TestRunCommandTool:
|
||||
# Create a file in the workspace to verify cwd
|
||||
(ws / "marker.txt").write_text("found")
|
||||
tool = RunCommandTool(ws, cfg)
|
||||
result = tool.run("tc-9", {"command": "cat marker.txt"})
|
||||
result = tool.run("tc-9", {"command": "head marker.txt"})
|
||||
assert result.status == ToolResultStatus.SUCCESS
|
||||
assert "found" in result.output
|
||||
|
||||
@@ -108,7 +108,7 @@ class TestToolRegistry:
|
||||
registry = create_default_registry(workspace, config)
|
||||
names = set(registry.get_all().keys())
|
||||
assert names == {
|
||||
"read_file", "list_dir", "grep_files", "find_files",
|
||||
"read_file", "read_many_files", "list_dir", "grep_files", "find_files",
|
||||
"write_file", "make_dir", "delete_file",
|
||||
"str_replace", "patch_apply",
|
||||
"run_command",
|
||||
@@ -118,7 +118,7 @@ class TestToolRegistry:
|
||||
def test_schema_export(self, workspace: Path, config: AppConfig) -> None:
|
||||
registry = create_default_registry(workspace, config)
|
||||
schemas = registry.get_openai_tools_schema()
|
||||
assert len(schemas) == 11
|
||||
assert len(schemas) == 12
|
||||
assert all(s["type"] == "function" for s in schemas)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user